This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ec055e1acb [feature](new file reader) Integrate new file reader (#15175) ec055e1acb is described below commit ec055e1acbcc040d97ed3d15b88af2a695a4078b Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Mon Dec 26 08:55:52 2022 +0800 [feature](new file reader) Integrate new file reader (#15175) --- be/src/http/action/stream_load.cpp | 13 +- be/src/io/CMakeLists.txt | 3 +- be/src/io/buffered_reader.cpp | 12 +- be/src/io/buffered_reader.h | 5 +- be/src/io/file_factory.cpp | 142 ++++++++++--- be/src/io/file_factory.h | 41 ++++ be/src/io/fs/hdfs_file_reader.cpp | 11 +- be/src/io/fs/hdfs_file_system.cpp | 42 ++-- be/src/io/fs/hdfs_file_system.h | 2 +- .../fs/kafka_consumer_pipe.h} | 11 +- ...m_load_pipe_reader.cpp => stream_load_pipe.cpp} | 53 +++-- ...tream_load_pipe_reader.h => stream_load_pipe.h} | 16 +- be/src/io/hdfs_reader_writer.cpp | 44 ---- be/src/runtime/exec_env.h | 9 + be/src/runtime/fragment_mgr.cpp | 23 +-- be/src/runtime/fragment_mgr.h | 9 +- be/src/runtime/routine_load/data_consumer.h | 2 - .../runtime/routine_load/data_consumer_group.cpp | 12 +- .../routine_load/routine_load_task_executor.cpp | 28 +-- be/src/runtime/stream_load/new_load_stream_mgr.cpp | 4 +- be/src/runtime/stream_load/new_load_stream_mgr.h | 8 +- be/src/runtime/stream_load/stream_load_context.h | 3 +- be/src/util/hdfs_storage_backend.cpp | 5 +- be/src/vec/CMakeLists.txt | 2 +- be/src/vec/exec/format/csv/csv_reader.cpp | 65 ++++-- be/src/vec/exec/format/csv/csv_reader.h | 12 +- .../exec/format/file_reader/new_file_factory.cpp | 201 ------------------ .../vec/exec/format/file_reader/new_file_factory.h | 115 ----------- .../file_reader/new_plain_binary_line_reader.cpp | 71 +++++++ .../file_reader/new_plain_binary_line_reader.h} | 27 ++- .../file_reader/new_plain_text_line_reader.cpp | 18 +- .../file_reader/new_plain_text_line_reader.h | 8 +- be/src/vec/exec/format/json/new_json_reader.cpp | 79 ++++--- be/src/vec/exec/format/json/new_json_reader.h | 19 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 57 +++-- be/src/vec/exec/format/orc/vorc_reader.h | 17 +- .../vec/exec/format/parquet/parquet_thrift_util.h | 18 +- .../exec/format/parquet/vparquet_column_reader.cpp | 10 +- .../exec/format/parquet/vparquet_column_reader.h | 8 +- .../exec/format/parquet/vparquet_group_reader.cpp | 2 +- .../exec/format/parquet/vparquet_group_reader.h | 5 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 36 ++-- be/src/vec/exec/format/parquet/vparquet_reader.h | 8 +- .../runtime/routine_load_task_executor_test.cpp | 22 +- be/test/vec/exec/parquet/parquet_reader_test.cpp | 7 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 78 ++++--- .../data/load_p0/stream_load/test_json_load.out | 165 --------------- .../load_p0/stream_load/test_json_load.groovy | 230 ++------------------- 48 files changed, 708 insertions(+), 1070 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 5097de5c0d..6595bb6d90 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -40,16 +40,16 @@ #include "http/http_request.h" #include "http/http_response.h" #include "http/utils.h" +#include "io/fs/stream_load_pipe.h" #include "olap/storage_engine.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/load_path_mgr.h" #include "runtime/plan_fragment_executor.h" -#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" -#include "runtime/stream_load/stream_load_pipe.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/byte_buffer.h" #include "util/debug_util.h" @@ -396,10 +396,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_header_type(ctx->header_type); request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */, - 64 * 1024 /* min_chunk_size */, - ctx->body_bytes /* total_length */); - RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); + auto pipe = std::make_shared<io::StreamLoadPipe>( + kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe)); + request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; } else { diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index c19188deb4..a8ea5ff232 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -28,7 +28,6 @@ set(IO_FILES file_factory.cpp hdfs_builder.cpp hdfs_file_reader.cpp - hdfs_reader_writer.cpp hdfs_writer.cpp local_file_reader.cpp local_file_writer.cpp @@ -45,7 +44,7 @@ set(IO_FILES fs/hdfs_file_reader.cpp fs/broker_file_system.cpp fs/broker_file_reader.cpp - fs/stream_load_pipe_reader.cpp + fs/stream_load_pipe.cpp cache/dummy_file_cache.cpp cache/file_cache.cpp cache/file_cache_manager.cpp diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp index 021f0b9d23..d3944532cf 100644 --- a/be/src/io/buffered_reader.cpp +++ b/be/src/io/buffered_reader.cpp @@ -21,6 +21,7 @@ #include <sstream> #include "common/config.h" +#include "olap/iterators.h" #include "olap/olap_define.h" #include "util/bit_util.h" @@ -185,7 +186,7 @@ bool BufferedReader::closed() { return _reader->closed(); } -BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t offset, +BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, size_t max_buf_size) : _file(file), _file_start_offset(offset), @@ -223,11 +224,12 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset int64_t has_read = 0; SCOPED_RAW_TIMER(&_statistics.read_time); while (has_read < to_read) { - int64_t loop_read = 0; - RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - has_read, &loop_read, - _buf.get() + buf_remaining + has_read)); + size_t loop_read = 0; + Slice resutl(_buf.get() + buf_remaining + has_read, to_read - has_read); + IOContext io_context; + RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, resutl, io_context, &loop_read)); _statistics.read_calls++; - if (loop_read <= 0) { + if (loop_read == 0) { break; } has_read += loop_read; diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h index abcf24916e..503f5af5ae 100644 --- a/be/src/io/buffered_reader.h +++ b/be/src/io/buffered_reader.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "io/file_reader.h" +#include "io/fs/file_reader.h" #include "olap/olap_define.h" #include "util/runtime_profile.h" @@ -113,7 +114,7 @@ protected: class BufferedFileStreamReader : public BufferedStreamReader { public: - BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t length, + BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, size_t max_buf_size); ~BufferedFileStreamReader() override = default; @@ -122,7 +123,7 @@ public: private: std::unique_ptr<uint8_t[]> _buf; - FileReader* _file; + io::FileReaderSPtr _file; uint64_t _file_start_offset; uint64_t _file_end_offset; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 61dbfbfbf4..2c8b56ea50 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -17,22 +17,32 @@ #include "io/file_factory.h" +#include "common/status.h" #include "io/broker_reader.h" #include "io/broker_writer.h" #include "io/buffered_reader.h" -#include "io/hdfs_reader_writer.h" +#include "io/fs/broker_file_system.h" +#include "io/fs/file_system.h" +#include "io/fs/hdfs_file_system.h" +#include "io/fs/local_file_system.h" +#include "io/fs/s3_file_system.h" +#include "io/hdfs_file_reader.h" +#include "io/hdfs_writer.h" #include "io/local_file_reader.h" #include "io/local_file_writer.h" #include "io/s3_reader.h" #include "io/s3_writer.h" #include "runtime/exec_env.h" #include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" -doris::Status doris::FileFactory::create_file_writer( - TFileType::type type, doris::ExecEnv* env, - const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, const std::string& path, - int64_t start_offset, std::unique_ptr<FileWriter>& file_writer) { +namespace doris { + +Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const std::string& path, int64_t start_offset, + std::unique_ptr<FileWriter>& file_writer) { switch (type) { case TFileType::FILE_LOCAL: { file_writer.reset(new LocalFileWriter(path, start_offset)); @@ -47,7 +57,7 @@ doris::Status doris::FileFactory::create_file_writer( break; } case TFileType::FILE_HDFS: { - RETURN_IF_ERROR(HdfsReaderWriter::create_writer( + RETURN_IF_ERROR(create_hdfs_writer( const_cast<std::map<std::string, std::string>&>(properties), path, file_writer)); break; } @@ -60,11 +70,11 @@ doris::Status doris::FileFactory::create_file_writer( // ============================ // broker scan node/unique ptr -doris::Status doris::FileFactory::create_file_reader( - doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile, - const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range, - int64_t start_offset, std::unique_ptr<FileReader>& file_reader) { +Status FileFactory::create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + std::unique_ptr<FileReader>& file_reader) { FileReader* file_reader_ptr; switch (type) { case TFileType::FILE_LOCAL: { @@ -85,8 +95,7 @@ doris::Status doris::FileFactory::create_file_reader( } case TFileType::FILE_HDFS: { FileReader* hdfs_reader = nullptr; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset, - &hdfs_reader)); + hdfs_reader = new HdfsFileReader(range.hdfs_params, range.path, start_offset); file_reader_ptr = new BufferedReader(profile, hdfs_reader); break; } @@ -100,13 +109,12 @@ doris::Status doris::FileFactory::create_file_reader( // ============================ // file scan node/unique ptr -doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::string& path, int64_t start_offset, - int64_t file_size, int64_t buffer_size, - std::unique_ptr<FileReader>& file_reader) { +Status FileFactory::create_file_reader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const std::string& path, int64_t start_offset, + int64_t file_size, int64_t buffer_size, + std::unique_ptr<FileReader>& file_reader) { FileReader* file_reader_ptr; - doris::TFileType::type type = params.file_type; + TFileType::type type = params.file_type; switch (type) { case TFileType::FILE_LOCAL: { file_reader_ptr = new LocalFileReader(path, start_offset); @@ -117,8 +125,7 @@ doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile, break; } case TFileType::FILE_HDFS: { - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, path, start_offset, - &file_reader_ptr)); + file_reader_ptr = new HdfsFileReader(params.hdfs_params, path, start_offset); break; } case TFileType::FILE_BROKER: { @@ -138,12 +145,99 @@ doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile, return Status::OK(); } +Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/, + const FileSystemProperties& system_properties, + const FileDescription& file_description, + std::unique_ptr<io::FileSystem>* file_system, + io::FileReaderSPtr* file_reader) { + TFileType::type type = system_properties.system_type; + io::FileSystem* file_system_ptr = nullptr; + switch (type) { + case TFileType::FILE_LOCAL: { + RETURN_IF_ERROR( + io::global_local_filesystem()->open_file(file_description.path, file_reader)); + break; + } + case TFileType::FILE_S3: { + RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path, + &file_system_ptr, file_reader)); + break; + } + case TFileType::FILE_HDFS: { + RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path, + &file_system_ptr, file_reader)); + break; + } + case TFileType::FILE_BROKER: { + RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0], + system_properties.properties, file_description.path, + &file_system_ptr, file_reader)); + break; + } + default: + return Status::NotSupported("unsupported file reader type: {}", std::to_string(type)); + } + file_system->reset(file_system_ptr); + return Status::OK(); +} + // file scan node/stream load pipe -doris::Status doris::FileFactory::create_pipe_reader(const TUniqueId& load_id, - std::shared_ptr<FileReader>& file_reader) { +Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader) { + *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); + if (!(*file_reader)) { + return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); + } + return Status::OK(); +} + +Status FileFactory::create_pipe_reader(const TUniqueId& load_id, + std::shared_ptr<FileReader>& file_reader) { file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id); if (!file_reader) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } return Status::OK(); } + +Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + io::FileSystem** hdfs_file_system, + io::FileReaderSPtr* reader) { + *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, ""); + RETURN_IF_ERROR((dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect()); + RETURN_IF_ERROR((*hdfs_file_system)->open_file(path, reader)); + return Status::OK(); +} + +Status FileFactory::create_hdfs_writer(const std::map<std::string, std::string>& properties, + const std::string& path, + std::unique_ptr<FileWriter>& writer) { + writer.reset(new HDFSWriter(properties, path)); + return Status::OK(); +} + +Status FileFactory::create_s3_reader(const std::map<std::string, std::string>& prop, + const std::string& path, io::FileSystem** s3_file_system, + io::FileReaderSPtr* reader) { + S3URI s3_uri(path); + if (!s3_uri.parse()) { + return Status::InvalidArgument("s3 uri is invalid: {}", path); + } + S3Conf s3_conf; + RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf)); + *s3_file_system = new io::S3FileSystem(s3_conf, ""); + RETURN_IF_ERROR((dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect()); + RETURN_IF_ERROR((*s3_file_system)->open_file(s3_uri.get_key(), reader)); + return Status::OK(); +} + +Status FileFactory::create_broker_reader(const TNetworkAddress& broker_addr, + const std::map<std::string, std::string>& prop, + const std::string& path, + io::FileSystem** broker_file_system, + io::FileReaderSPtr* reader) { + *broker_file_system = new io::BrokerFileSystem(broker_addr, prop); + RETURN_IF_ERROR((dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect()); + RETURN_IF_ERROR((*broker_file_system)->open_file(path, reader)); + return Status::OK(); +} +} // namespace doris diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 48e22737eb..69d1158816 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -20,12 +20,29 @@ #include "gen_cpp/Types_types.h" #include "io/file_reader.h" #include "io/file_writer.h" +#include "io/fs/file_reader.h" namespace doris { +namespace io { +class FileSystem; +} class ExecEnv; class TNetworkAddress; class RuntimeProfile; +struct FileSystemProperties { + TFileType::type system_type; + std::map<std::string, std::string> properties; + THdfsParams hdfs_params; + std::vector<TNetworkAddress> broker_addresses; +}; + +struct FileDescription { + std::string path; + int64_t start_offset; + size_t file_size; +}; + class FileFactory { public: // Create FileWriter @@ -53,10 +70,34 @@ public: int64_t file_size, int64_t buffer_size, std::unique_ptr<FileReader>& file_reader); + static Status create_file_reader(RuntimeProfile* profile, + const FileSystemProperties& system_properties, + const FileDescription& file_description, + std::unique_ptr<io::FileSystem>* file_system, + io::FileReaderSPtr* file_reader); + // Create FileReader for stream load pipe + static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); + + // [deprecated] Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, std::shared_ptr<FileReader>& file_reader); + static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + io::FileSystem** hdfs_file_system, io::FileReaderSPtr* reader); + + static Status create_hdfs_writer(const std::map<std::string, std::string>& properties, + const std::string& path, std::unique_ptr<FileWriter>& writer); + + static Status create_s3_reader(const std::map<std::string, std::string>& prop, + const std::string& path, io::FileSystem** s3_file_system, + io::FileReaderSPtr* reader); + + static Status create_broker_reader(const TNetworkAddress& broker_addr, + const std::map<std::string, std::string>& prop, + const std::string& path, io::FileSystem** hdfs_file_system, + io::FileReaderSPtr* reader); + static TFileType::type convert_storage_type(TStorageBackendType::type type) { switch (type) { case TStorageBackendType::LOCAL: diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index ef03541387..39c9795e95 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -61,6 +61,14 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*i return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, _path.native()); } + + auto handle = _fs->get_handle(); + int res = hdfsSeek(handle->hdfs_fs, _hdfs_file, offset); + if (res != 0) { + return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}", + BackendOptions::get_localhost(), offset, hdfsGetLastError()); + } + size_t bytes_req = result.size; char* to = result.data; bytes_req = std::min(bytes_req, _file_size - offset); @@ -69,8 +77,7 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*i return Status::OK(); } - auto handle = _fs->get_handle(); - int64_t has_read = 0; + size_t has_read = 0; while (has_read < bytes_req) { int64_t loop_read = hdfsRead(handle->hdfs_fs, _hdfs_file, to + has_read, bytes_req - has_read); diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index cafa7ca34c..b053aeebb4 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -63,14 +63,8 @@ private: HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path) : RemoteFileSystem(path, "", FileSystemType::HDFS), _hdfs_params(hdfs_params), - _path(path), _fs_handle(nullptr) { _namenode = _hdfs_params.fs_name; - // if the format of _path is hdfs://ip:port/path, replace it to /path. - // path like hdfs://ip:port/path can't be used by libhdfs3. - if (_path.find(_namenode) != std::string::npos) { - _path = _path.substr(_namenode.size()); - } } HdfsFileSystem::~HdfsFileSystem() { @@ -101,9 +95,12 @@ Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { CHECK_HDFS_HANDLE(_fs_handle); - size_t file_len = -1; + size_t file_len = 0; RETURN_IF_ERROR(file_size(path, &file_len)); - auto hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(), O_RDONLY, 0, 0, 0); + + Path real_path = _covert_path(path); + auto hdfs_file = + hdfsOpenFile(_fs_handle->hdfs_fs, real_path.string().c_str(), O_RDONLY, 0, 0, 0); if (hdfs_file == nullptr) { if (_fs_handle->from_cache) { // hdfsFS may be disconnected if not used for a long time @@ -130,9 +127,10 @@ Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { Status HdfsFileSystem::delete_file(const Path& path) { CHECK_HDFS_HANDLE(_fs_handle); + Path real_path = _covert_path(path); // The recursive argument `is_recursive` is irrelevant if path is a file. int is_recursive = 0; - int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), is_recursive); + int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), is_recursive); if (res == -1) { return Status::InternalError("Failed to delete file {}", path.string()); } @@ -141,7 +139,8 @@ Status HdfsFileSystem::delete_file(const Path& path) { Status HdfsFileSystem::create_directory(const Path& path) { CHECK_HDFS_HANDLE(_fs_handle); - int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, path.string().c_str()); + Path real_path = _covert_path(path); + int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, real_path.string().c_str()); if (res == -1) { return Status::InternalError("Failed to create directory {}", path.string()); } @@ -150,9 +149,10 @@ Status HdfsFileSystem::create_directory(const Path& path) { Status HdfsFileSystem::delete_directory(const Path& path) { CHECK_HDFS_HANDLE(_fs_handle); + Path real_path = _covert_path(path); // delete in recursive mode int is_recursive = 1; - int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), is_recursive); + int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), is_recursive); if (res == -1) { return Status::InternalError("Failed to delete directory {}", path.string()); } @@ -161,7 +161,8 @@ Status HdfsFileSystem::delete_directory(const Path& path) { Status HdfsFileSystem::exists(const Path& path, bool* res) const { CHECK_HDFS_HANDLE(_fs_handle); - int is_exists = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str()); + Path real_path = _covert_path(path); + int is_exists = hdfsExists(_fs_handle->hdfs_fs, real_path.string().c_str()); if (is_exists == 0) { *res = true; } else { @@ -172,7 +173,8 @@ Status HdfsFileSystem::exists(const Path& path, bool* res) const { Status HdfsFileSystem::file_size(const Path& path, size_t* file_size) const { CHECK_HDFS_HANDLE(_fs_handle); - hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, path.string().c_str()); + Path real_path = _covert_path(path); + hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, real_path.string().c_str()); if (file_info == nullptr) { return Status::InternalError("Failed to get file size of {}", path.string()); } @@ -183,9 +185,10 @@ Status HdfsFileSystem::file_size(const Path& path, size_t* file_size) const { Status HdfsFileSystem::list(const Path& path, std::vector<Path>* files) { CHECK_HDFS_HANDLE(_fs_handle); + Path real_path = _covert_path(path); int numEntries = 0; hdfsFileInfo* file_info = - hdfsListDirectory(_fs_handle->hdfs_fs, path.string().c_str(), &numEntries); + hdfsListDirectory(_fs_handle->hdfs_fs, real_path.string().c_str(), &numEntries); if (file_info == nullptr) { return Status::InternalError("Failed to list files/directors of {}", path.string()); } @@ -200,6 +203,17 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() { return _fs_handle; } +Path HdfsFileSystem::_covert_path(const Path& path) const { + // if the format of path is hdfs://ip:port/path, replace it to /path. + // path like hdfs://ip:port/path can't be used by libhdfs3. + Path real_path(path); + if (path.string().find(_namenode) != std::string::npos) { + std::string real_path_str = path.string().substr(_namenode.size()); + real_path = real_path_str; + } + return real_path; +} + // ************* HdfsFileSystemCache ****************** int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index f86e73d1d3..01e8da58ca 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -119,9 +119,9 @@ public: HdfsFileSystemHandle* get_handle(); private: + Path _covert_path(const Path& path) const; const THdfsParams& _hdfs_params; std::string _namenode; - std::string _path; // do not use std::shared_ptr or std::unique_ptr // _fs_handle is managed by HdfsFileSystemCache HdfsFileSystemHandle* _fs_handle; diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe_reader.h b/be/src/io/fs/kafka_consumer_pipe.h similarity index 77% rename from be/src/runtime/routine_load/kafka_consumer_pipe_reader.h rename to be/src/io/fs/kafka_consumer_pipe.h index 6555057c5f..6aab83c3b2 100644 --- a/be/src/runtime/routine_load/kafka_consumer_pipe_reader.h +++ b/be/src/io/fs/kafka_consumer_pipe.h @@ -17,17 +17,16 @@ #pragma once -#include "io/fs/stream_load_pipe_reader.h" +#include "io/fs/stream_load_pipe.h" namespace doris { namespace io { -class KafkaConsumerPipeReader : public StreamLoadPipeReader { +class KafkaConsumerPipe : public StreamLoadPipe { public: - KafkaConsumerPipeReader(size_t max_buffered_bytes = 1024 * 1024, - size_t min_chunk_size = 64 * 1024) - : StreamLoadPipeReader(max_buffered_bytes, min_chunk_size) {} + KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024) + : StreamLoadPipe(max_buffered_bytes, min_chunk_size) {} - ~KafkaConsumerPipeReader() override = default; + ~KafkaConsumerPipe() override = default; Status append_with_line_delimiter(const char* data, size_t size) { Status st = append(data, size); diff --git a/be/src/io/fs/stream_load_pipe_reader.cpp b/be/src/io/fs/stream_load_pipe.cpp similarity index 76% rename from be/src/io/fs/stream_load_pipe_reader.cpp rename to be/src/io/fs/stream_load_pipe.cpp index 6cc7b0a985..cc5132478c 100644 --- a/be/src/io/fs/stream_load_pipe_reader.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -15,32 +15,34 @@ // specific language governing permissions and limitations // under the License. -#include "stream_load_pipe_reader.h" +#include "stream_load_pipe.h" #include <gen_cpp/internal_service.pb.h> +#include "olap/iterators.h" #include "runtime/thread_context.h" #include "util/bit_util.h" namespace doris { namespace io { -StreamLoadPipeReader::StreamLoadPipeReader(size_t max_buffered_bytes, size_t min_chunk_size, - int64_t total_length, bool use_proto) +StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size, + int64_t total_length, bool use_proto) : _buffered_bytes(0), _proto_buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), + _total_length(total_length), _use_proto(use_proto) {} -StreamLoadPipeReader::~StreamLoadPipeReader() { +StreamLoadPipe::~StreamLoadPipe() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); while (!_buf_queue.empty()) { _buf_queue.pop_front(); } } -Status StreamLoadPipeReader::read_at(size_t /*offset*/, Slice result, const IOContext& /*io_ctx*/, - size_t* bytes_read) { +Status StreamLoadPipe::read_at(size_t /*offset*/, Slice result, const IOContext& /*io_ctx*/, + size_t* bytes_read) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); *bytes_read = 0; size_t bytes_req = result.size; @@ -79,15 +81,38 @@ Status StreamLoadPipeReader::read_at(size_t /*offset*/, Slice result, const IOCo return Status::OK(); } -Status StreamLoadPipeReader::append_and_flush(const char* data, size_t size, - size_t proto_byte_size) { +// If _total_length == -1, this should be a Kafka routine load task, +// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. +// Otherwise, this should be a stream load task that needs to read the specified amount of data. +Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length) { + if (_total_length < -1) { + return Status::InternalError("invalid, _total_length is: {}", _total_length); + } else if (_total_length == 0) { + // no data + *length = 0; + return Status::OK(); + } + + if (_total_length == -1) { + return _read_next_buffer(data, length); + } + + // _total_length > 0, read the entire data + data->reset(new uint8_t[_total_length]); + Slice result(data->get(), _total_length); + IOContext io_ctx; + Status st = read_at(0, result, io_ctx, length); + return st; +} + +Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); } -Status StreamLoadPipeReader::append(const char* data, size_t size) { +Status StreamLoadPipe::append(const char* data, size_t size) { size_t pos = 0; if (_write_buf != nullptr) { if (size < _write_buf->remaining()) { @@ -110,7 +135,7 @@ Status StreamLoadPipeReader::append(const char* data, size_t size) { return Status::OK(); } -Status StreamLoadPipeReader::append(const ByteBufferPtr& buf) { +Status StreamLoadPipe::append(const ByteBufferPtr& buf) { if (_write_buf != nullptr) { _write_buf->flip(); RETURN_IF_ERROR(_append(_write_buf)); @@ -120,7 +145,7 @@ Status StreamLoadPipeReader::append(const ByteBufferPtr& buf) { } // read the next buffer from _buf_queue -Status StreamLoadPipeReader::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t* length) { +Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length) { std::unique_lock<std::mutex> l(_lock); while (!_cancelled && !_finished && _buf_queue.empty()) { _get_cond.wait(l); @@ -150,7 +175,7 @@ Status StreamLoadPipeReader::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, return Status::OK(); } -Status StreamLoadPipeReader::_append(const ByteBufferPtr& buf, size_t proto_byte_size) { +Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size) { { std::unique_lock<std::mutex> l(_lock); // if _buf_queue is empty, we append this buf without size check @@ -180,7 +205,7 @@ Status StreamLoadPipeReader::_append(const ByteBufferPtr& buf, size_t proto_byte } // called when producer finished -Status StreamLoadPipeReader::finish() { +Status StreamLoadPipe::finish() { if (_write_buf != nullptr) { _write_buf->flip(); _append(_write_buf); @@ -195,7 +220,7 @@ Status StreamLoadPipeReader::finish() { } // called when producer/consumer failed -void StreamLoadPipeReader::cancel(const std::string& reason) { +void StreamLoadPipe::cancel(const std::string& reason) { { std::lock_guard<std::mutex> l(_lock); _cancelled = true; diff --git a/be/src/io/fs/stream_load_pipe_reader.h b/be/src/io/fs/stream_load_pipe.h similarity index 85% rename from be/src/io/fs/stream_load_pipe_reader.h rename to be/src/io/fs/stream_load_pipe.h index a5cc95e3e8..59d391d5ad 100644 --- a/be/src/io/fs/stream_load_pipe_reader.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -28,13 +28,13 @@ namespace io { const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; -class StreamLoadPipeReader : public MessageBodySink, public FileReader { +class StreamLoadPipe : public MessageBodySink, public FileReader { public: - StreamLoadPipeReader(size_t max_buffered_bytes = kMaxPipeBufferedBytes, - size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, - bool use_proto = false); + StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, + size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, + bool use_proto = false); - ~StreamLoadPipeReader() override; + ~StreamLoadPipe() override; Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0); @@ -63,9 +63,11 @@ public: // called when producer/consumer failed void cancel(const std::string& reason) override; + Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length); + private: // read the next buffer from _buf_queue - Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t* length); + Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length); Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0); @@ -82,7 +84,7 @@ private: // The default is -1, which means that the data arrives in a stream // and the length is unknown. // size_t is unsigned, so use int64_t - // int64_t _total_length = -1; + int64_t _total_length = -1; bool _use_proto = false; std::deque<ByteBufferPtr> _buf_queue; std::condition_variable _put_cond; diff --git a/be/src/io/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp deleted file mode 100644 index 3cbe2f4436..0000000000 --- a/be/src/io/hdfs_reader_writer.cpp +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/hdfs_reader_writer.h" - -#include "io/hdfs_file_reader.h" -#include "io/hdfs_writer.h" - -namespace doris { - -Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const std::string& path, - int64_t start_offset, FileReader** reader) { - *reader = new HdfsFileReader(hdfs_params, path, start_offset); - return Status::OK(); -} - -Status HdfsReaderWriter::create_reader(const std::map<std::string, std::string>& hdfs_params, - const std::string& path, int64_t start_offset, - FileReader** reader) { - *reader = new HdfsFileReader(hdfs_params, path, start_offset); - return Status::OK(); -} - -Status HdfsReaderWriter::create_writer(const std::map<std::string, std::string>& properties, - const std::string& path, - std::unique_ptr<FileWriter>& writer) { - writer.reset(new HDFSWriter(properties, path)); - return Status::OK(); -} -} // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 0b36a2b8e8..e3ab690168 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -182,6 +182,15 @@ public: HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } + // only for unit test + void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } + void set_new_load_stream_mgr(NewLoadStreamMgr* new_load_stream_mgr) { + this->_new_load_stream_mgr = new_load_stream_mgr; + } + void set_stream_load_executor(StreamLoadExecutor* stream_load_executor) { + this->_stream_load_executor = stream_load_executor; + } + private: Status _init(const std::vector<StorePath>& store_paths); void _destroy(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 134df9f221..c89cfc69ef 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -33,6 +33,7 @@ #include "gen_cpp/QueryPlanExtra_types.h" #include "gen_cpp/Types_types.h" #include "gutil/strings/substitute.h" +#include "io/fs/stream_load_pipe.h" #include "opentelemetry/trace/scope.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/client_cache.h" @@ -41,9 +42,8 @@ #include "runtime/exec_env.h" #include "runtime/plan_fragment_executor.h" #include "runtime/runtime_filter_mgr.h" -#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" -#include "runtime/stream_load/stream_load_pipe.h" #include "runtime/thread_context.h" #include "service/backend_options.h" #include "util/doris_metrics.h" @@ -137,8 +137,8 @@ public: std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; } - void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; } - std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; } + void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; } + std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; } void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } @@ -172,7 +172,7 @@ private: std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; // The pipe for data transfering, such as insert. - std::shared_ptr<StreamLoadPipe> _pipe; + std::shared_ptr<io::StreamLoadPipe> _pipe; // If set the true, this plan fragment will be executed only after FE send execution start rpc. bool _need_wait_execution_trigger = false; @@ -528,14 +528,13 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_ctx->auth.auth_code_uuid = params.txn_conf.auth_code_uuid; stream_load_ctx->need_commit_self = true; stream_load_ctx->need_rollback = true; - // total_length == -1 means read one message from pipe in once time, don't care the length. - auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */, - 64 * 1024 /* min_chunk_size */, - -1 /* total_length */, true /* use_proto */); + auto pipe = std::make_shared<io::StreamLoadPipe>( + kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + -1 /* total_length */, true /* use_proto */); stream_load_ctx->body_sink = pipe; stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; - RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_ctx->id, pipe)); + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, pipe)); RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); set_pipe(params.params.fragment_instance_id, pipe); @@ -562,7 +561,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r } void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, - std::shared_ptr<StreamLoadPipe> pipe) { + std::shared_ptr<io::StreamLoadPipe> pipe) { { std::lock_guard<std::mutex> lock(_lock); auto iter = _fragment_map.find(fragment_instance_id); @@ -572,7 +571,7 @@ void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, } } -std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) { +std::shared_ptr<io::StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) { { std::lock_guard<std::mutex> lock(_lock); auto iter = _fragment_map.find(fragment_instance_id); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index afd5408bbf..050db423dd 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -45,6 +45,10 @@ namespace pipeline { class PipelineFragmentContext; } +namespace io { +class StreamLoadPipe; +} + class QueryFragmentsCtx; class ExecEnv; class FragmentExecState; @@ -54,7 +58,6 @@ class TExecPlanFragmentParams; class TExecPlanFragmentParamsList; class TUniqueId; class RuntimeFilterMergeController; -class StreamLoadPipe; std::string to_load_error_http_path(const std::string& file_name); @@ -104,9 +107,9 @@ public: Status merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data); - void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe); + void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<io::StreamLoadPipe> pipe); - std::shared_ptr<StreamLoadPipe> get_pipe(const TUniqueId& fragment_instance_id); + std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& fragment_instance_id); private: void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb); diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index 173ffbd82a..afd4d9f6f7 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -30,7 +30,6 @@ namespace doris { class KafkaConsumerPipe; class Status; -class StreamLoadPipe; class DataConsumer { public: @@ -156,7 +155,6 @@ private: KafkaEventCb _k_event_cb; RdKafka::KafkaConsumer* _k_consumer = nullptr; - std::shared_ptr<KafkaConsumerPipe> _k_consumer_pipe; }; } // end namespace doris diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 0640591ece..869d427568 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -16,10 +16,10 @@ // under the License. #include "runtime/routine_load/data_consumer_group.h" +#include "io/fs/kafka_consumer_pipe.h" #include "librdkafka/rdkafka.h" #include "librdkafka/rdkafkacpp.h" #include "runtime/routine_load/data_consumer.h" -#include "runtime/routine_load/kafka_consumer_pipe.h" #include "runtime/stream_load/stream_load_context.h" namespace doris { @@ -96,8 +96,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { int64_t left_rows = ctx->max_batch_rows; int64_t left_bytes = ctx->max_batch_size; - std::shared_ptr<KafkaConsumerPipe> kafka_pipe = - std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink); + std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe = + std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink); LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " @@ -107,11 +107,11 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset; //improve performance - Status (KafkaConsumerPipe::*append_data)(const char* data, size_t size); + Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t size); if (ctx->format == TFileFormatType::FORMAT_JSON) { - append_data = &KafkaConsumerPipe::append_json; + append_data = &io::KafkaConsumerPipe::append_json; } else { - append_data = &KafkaConsumerPipe::append_with_line_delimiter; + append_data = &io::KafkaConsumerPipe::append_with_line_delimiter; } MonotonicStopWatch watch; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index a5535199e2..da8842ac59 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -23,9 +23,10 @@ #include "gen_cpp/BackendService_types.h" #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/Types_types.h" +#include "io/fs/kafka_consumer_pipe.h" +#include "olap/iterators.h" #include "runtime/exec_env.h" #include "runtime/routine_load/data_consumer_group.h" -#include "runtime/routine_load/kafka_consumer_pipe.h" #include "runtime/stream_load/stream_load_context.h" #include "util/defer_op.h" #include "util/uid_util.h" @@ -193,9 +194,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { TStatus tstatus; tstatus.status_code = TStatusCode::OK; put_result.status = tstatus; - put_result.params = std::move(task.params); + put_result.params = task.params; put_result.__isset.params = true; - ctx->put_result = std::move(put_result); + ctx->put_result = put_result; if (task.__isset.format) { ctx->format = task.format; } @@ -267,10 +268,10 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool HANDLE_ERROR(consumer_pool->get_consumer_grp(ctx, &consumer_grp), "failed to get consumers"); // create and set pipe - std::shared_ptr<StreamLoadPipe> pipe; + std::shared_ptr<io::StreamLoadPipe> pipe; switch (ctx->load_src_type) { case TLoadSourceType::KAFKA: { - pipe = std::make_shared<KafkaConsumerPipe>(); + pipe = std::make_shared<io::KafkaConsumerPipe>(); Status st = std::static_pointer_cast<KafkaDataConsumerGroup>(consumer_grp) ->assign_topic_partitions(ctx); if (!st.ok()) { @@ -291,7 +292,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool ctx->body_sink = pipe; // must put pipe before executing plan fragment - HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe"); + HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe"); #ifndef BE_TEST // execute plan fragment, async @@ -365,32 +366,31 @@ void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status& _exec_env->stream_load_executor()->rollback_txn(ctx); ctx->need_rollback = false; } - if (ctx->body_sink.get() != nullptr) { + if (ctx->body_sink != nullptr) { ctx->body_sink->cancel(err_msg); } - - return; } // for test only Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) { auto mock_consumer = [this, ctx]() { ctx->ref(); - std::shared_ptr<StreamLoadPipe> pipe = _exec_env->load_stream_mgr()->get(ctx->id); - bool eof = false; + std::shared_ptr<io::StreamLoadPipe> pipe = _exec_env->new_load_stream_mgr()->get(ctx->id); std::stringstream ss; while (true) { char one; int64_t len = 1; - int64_t read_bytes = 0; - Status st = pipe->read((uint8_t*)&one, len, &read_bytes, &eof); + size_t read_bytes = 0; + Slice result((uint8_t*)&one, len); + IOContext io_ctx; + Status st = pipe->read_at(0, result, io_ctx, &read_bytes); if (!st.ok()) { LOG(WARNING) << "read failed"; ctx->promise.set_value(st); break; } - if (eof) { + if (read_bytes == 0) { ctx->promise.set_value(Status::OK()); break; } diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.cpp b/be/src/runtime/stream_load/new_load_stream_mgr.cpp index 20cbb295cb..c7a57362fe 100644 --- a/be/src/runtime/stream_load/new_load_stream_mgr.cpp +++ b/be/src/runtime/stream_load/new_load_stream_mgr.cpp @@ -22,8 +22,8 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(new_stream_load_pipe_count, MetricUnit::NOUNIT); NewLoadStreamMgr::NewLoadStreamMgr() { - // Each StreamLoadPipeReader has a limited buffer size (default 1M), it's not needed to count the - // actual size of all StreamLoadPipeReader. + // Each StreamLoadPipe has a limited buffer size (default 1M), it's not needed to count the + // actual size of all StreamLoadPipe. REGISTER_HOOK_METRIC(new_stream_load_pipe_count, [this]() { return _stream_map.size(); }); } diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h b/be/src/runtime/stream_load/new_load_stream_mgr.h index c009fbc71e..60cd3aa446 100644 --- a/be/src/runtime/stream_load/new_load_stream_mgr.h +++ b/be/src/runtime/stream_load/new_load_stream_mgr.h @@ -21,7 +21,7 @@ #include <mutex> #include <unordered_map> -#include "io/fs/stream_load_pipe_reader.h" +#include "io/fs/stream_load_pipe.h" #include "util/doris_metrics.h" #include "util/uid_util.h" @@ -34,7 +34,7 @@ public: NewLoadStreamMgr(); ~NewLoadStreamMgr(); - Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipeReader> stream) { + Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipe> stream) { std::lock_guard<std::mutex> l(_lock); auto it = _stream_map.find(id); if (it != std::end(_stream_map)) { @@ -45,7 +45,7 @@ public: return Status::OK(); } - std::shared_ptr<io::StreamLoadPipeReader> get(const UniqueId& id) { + std::shared_ptr<io::StreamLoadPipe> get(const UniqueId& id) { std::lock_guard<std::mutex> l(_lock); auto it = _stream_map.find(id); if (it == std::end(_stream_map)) { @@ -67,6 +67,6 @@ public: private: std::mutex _lock; - std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipeReader>> _stream_map; + std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipe>> _stream_map; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 795b0f304b..0fc27a27f9 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -29,6 +29,7 @@ #include "gen_cpp/FrontendService_types.h" #include "runtime/exec_env.h" #include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "service/backend_options.h" #include "util/string_util.h" @@ -92,7 +93,7 @@ public: need_rollback = false; } - _exec_env->load_stream_mgr()->remove(id); + _exec_env->new_load_stream_mgr()->remove(id); } std::string to_json() const; diff --git a/be/src/util/hdfs_storage_backend.cpp b/be/src/util/hdfs_storage_backend.cpp index 71df59c086..83c25716f9 100644 --- a/be/src/util/hdfs_storage_backend.cpp +++ b/be/src/util/hdfs_storage_backend.cpp @@ -18,7 +18,6 @@ #include "util/hdfs_storage_backend.h" #include "io/hdfs_file_reader.h" -#include "io/hdfs_reader_writer.h" #include "io/hdfs_writer.h" #include "olap/file_helper.h" #include "util/hdfs_util.h" @@ -55,7 +54,7 @@ Status HDFSStorageBackend::close() { // if the format of path is hdfs://ip:port/path, replace it to /path. // path like hdfs://ip:port/path can't be used by libhdfs3. std::string HDFSStorageBackend::parse_path(const std::string& path) { - if (path.find(hdfs_file_prefix) != path.npos) { + if (path.find(hdfs_file_prefix) != std::string::npos) { std::string temp = path.substr(hdfs_file_prefix.size()); std::size_t pos = temp.find_first_of('/'); return temp.substr(pos); @@ -143,7 +142,7 @@ Status HDFSStorageBackend::list(const std::string& remote_path, bool contain_md5 // get filename std::filesystem::path file_path(file_name_with_path); std::string file_name = file_path.filename(); - size_t pos = file_name.find_last_of("."); + size_t pos = file_name.find_last_of('.'); if (pos == std::string::npos || pos == file_name.size() - 1) { // Not found checksum separator, ignore this file continue; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 428f6b4f91..4426ba63a7 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -281,8 +281,8 @@ set(VEC_FILES exec/format/json/new_json_reader.cpp exec/format/table/table_format_reader.cpp exec/format/table/iceberg_reader.cpp - exec/format/file_reader/new_file_factory.cpp exec/format/file_reader/new_plain_text_line_reader.cpp + exec/format/file_reader/new_plain_binary_line_reader.cpp ) if (WITH_MYSQL) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 48f8d84dc7..63bdbd69f2 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -23,14 +23,15 @@ #include "common/consts.h" #include "common/status.h" #include "exec/decompressor.h" -#include "exec/plain_binary_line_reader.h" -#include "exec/plain_text_line_reader.h" #include "exec/text_converter.h" #include "exec/text_converter.hpp" +#include "io/file_factory.h" #include "util/string_util.h" #include "util/utf8_check.h" #include "vec/core/block.h" -#include "vec/exec/scan/vfile_scanner.h" +#include "vec/exec/format/file_reader/new_plain_binary_line_reader.h" +#include "vec/exec/format/file_reader/new_plain_text_line_reader.h" +#include "vec/exec/scan/vscanner.h" namespace doris::vectorized { @@ -45,6 +46,8 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _params(params), _range(range), _file_slot_descs(file_slot_descs), + _file_system(nullptr), + _file_reader(nullptr), _line_reader(nullptr), _line_reader_eof(false), _text_converter(nullptr), @@ -76,7 +79,7 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params _size = _range.size; } -CsvReader::~CsvReader() {} +CsvReader::~CsvReader() = default; Status CsvReader::init_reader(bool is_load) { // set the skip lines and start offset @@ -102,18 +105,23 @@ Status CsvReader::init_reader(bool is_load) { _skip_lines = 1; } - // create and open file reader - FileReader* real_reader = nullptr; + FileSystemProperties system_properties; + system_properties.system_type = _params.file_type; + system_properties.properties = _params.properties; + system_properties.hdfs_params = _params.hdfs_params; + + FileDescription file_description; + file_description.path = _range.path; + file_description.start_offset = start_offset; + file_description.file_size = _range.__isset.file_size ? _range.file_size : 0; + if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, _file_reader_s)); - real_reader = _file_reader_s.get(); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _params, _range.path, start_offset, _range.file_size, 0, _file_reader)); - real_reader = _file_reader.get(); + _profile, system_properties, file_description, &_file_system, &_file_reader)); } - RETURN_IF_ERROR(real_reader->open()); - if (real_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && + if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("Empty File"); } @@ -135,11 +143,13 @@ Status CsvReader::init_reader(bool is_load) { case TFileFormatType::FORMAT_CSV_LZ4FRAME: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: - _line_reader.reset(new PlainTextLineReader(_profile, real_reader, _decompressor.get(), - _size, _line_delimiter, _line_delimiter_length)); + _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader, _decompressor.get(), + _size, _line_delimiter, + _line_delimiter_length, start_offset)); + break; case TFileFormatType::FORMAT_PROTO: - _line_reader.reset(new PlainBinaryLineReader(real_reader)); + _line_reader.reset(new NewPlainBinaryLineReader(_file_reader, _params.file_type)); break; default: return Status::InternalError( @@ -495,11 +505,20 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { } } - // create and open file reader - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, _range.path, start_offset, - _range.file_size, 0, _file_reader)); - RETURN_IF_ERROR(_file_reader->open()); - if (_file_reader->size() == 0) { + FileSystemProperties system_properties; + system_properties.system_type = _params.file_type; + system_properties.properties = _params.properties; + system_properties.hdfs_params = _params.hdfs_params; + + FileDescription file_description; + file_description.path = _range.path; + file_description.start_offset = start_offset; + file_description.file_size = _range.__isset.file_size ? _range.file_size : 0; + + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, system_properties, file_description, + &_file_system, &_file_reader)); + if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && + _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("Empty File"); } @@ -513,8 +532,10 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { // _decompressor may be nullptr if this is not a compressed file RETURN_IF_ERROR(_create_decompressor()); - _line_reader.reset(new PlainTextLineReader(_profile, _file_reader.get(), _decompressor.get(), - _size, _line_delimiter, _line_delimiter_length)); + _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader, _decompressor.get(), + _size, _line_delimiter, _line_delimiter_length, + start_offset)); + return Status::OK(); } diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 5bb14523e3..3a22e0aa14 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -17,10 +17,11 @@ #pragma once +#include "io/fs/file_reader.h" #include "vec/exec/format/generic_reader.h" + namespace doris { -class FileReader; class LineReader; class TextConverter; class Decompressor; @@ -63,13 +64,13 @@ private: bool _is_array(const Slice& slice); // used for parse table schema of csv file. + // Currently, this feature is for table valued function. Status _prepare_parse(size_t* read_line, bool* is_parse_name); Status _parse_col_nums(size_t* col_nums); Status _parse_col_names(std::vector<std::string>* col_names); // TODO(ftw): parse type Status _parse_col_types(size_t col_nums, std::vector<TypeDescriptor>* col_types); -private: RuntimeState* _state; RuntimeProfile* _profile; ScannerCounter* _counter; @@ -84,11 +85,8 @@ private: // True if this is a load task bool _is_load = false; - // _file_reader_s is for stream load pipe reader, - // and _file_reader is for other file reader. - // TODO: refactor this to use only shared_ptr or unique_ptr - std::unique_ptr<FileReader> _file_reader; - std::shared_ptr<FileReader> _file_reader_s; + std::unique_ptr<io::FileSystem> _file_system; + io::FileReaderSPtr _file_reader; std::unique_ptr<LineReader> _line_reader; bool _line_reader_eof; std::unique_ptr<TextConverter> _text_converter; diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp b/be/src/vec/exec/format/file_reader/new_file_factory.cpp deleted file mode 100644 index a02b5ebdba..0000000000 --- a/be/src/vec/exec/format/file_reader/new_file_factory.cpp +++ /dev/null @@ -1,201 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/format/file_reader/new_file_factory.h" - -#include "io/broker_reader.h" -#include "io/broker_writer.h" -#include "io/buffered_reader.h" -#include "io/fs/broker_file_system.h" -#include "io/fs/file_system.h" -#include "io/fs/hdfs_file_system.h" -#include "io/fs/s3_file_system.h" -#include "io/hdfs_file_reader.h" -#include "io/hdfs_writer.h" -#include "io/local_file_reader.h" -#include "io/local_file_writer.h" -#include "io/s3_reader.h" -#include "io/s3_writer.h" -#include "runtime/exec_env.h" -#include "runtime/stream_load/load_stream_mgr.h" -#include "runtime/stream_load/new_load_stream_mgr.h" -#include "util/s3_util.h" - -namespace doris { - -Status NewFileFactory::create_file_writer(TFileType::type type, ExecEnv* env, - const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, - const std::string& path, int64_t start_offset, - std::unique_ptr<FileWriter>& file_writer) { - switch (type) { - case TFileType::FILE_LOCAL: { - file_writer.reset(new LocalFileWriter(path, start_offset)); - break; - } - case TFileType::FILE_BROKER: { - file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset)); - break; - } - case TFileType::FILE_S3: { - file_writer.reset(new S3Writer(properties, path, start_offset)); - break; - } - case TFileType::FILE_HDFS: { - RETURN_IF_ERROR(create_hdfs_writer( - const_cast<std::map<std::string, std::string>&>(properties), path, file_writer)); - break; - } - default: - return Status::InternalError("unsupported file writer type: {}", std::to_string(type)); - } - - return Status::OK(); -} - -// ============================ -// broker scan node/unique ptr -Status NewFileFactory::create_file_reader(TFileType::type type, ExecEnv* env, - RuntimeProfile* profile, - const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, - const TBrokerRangeDesc& range, int64_t start_offset, - std::unique_ptr<FileReader>& file_reader) { - FileReader* file_reader_ptr; - switch (type) { - case TFileType::FILE_LOCAL: { - file_reader_ptr = new LocalFileReader(range.path, start_offset); - break; - } - case TFileType::FILE_BROKER: { - file_reader_ptr = new BufferedReader( - profile, - new BrokerReader(env, broker_addresses, properties, range.path, start_offset, - range.__isset.file_size ? range.file_size : 0)); - break; - } - case TFileType::FILE_S3: { - file_reader_ptr = - new BufferedReader(profile, new S3Reader(properties, range.path, start_offset)); - break; - } - case TFileType::FILE_HDFS: { - FileReader* hdfs_reader = nullptr; - RETURN_IF_ERROR( - create_hdfs_reader(range.hdfs_params, range.path, start_offset, &hdfs_reader)); - file_reader_ptr = new BufferedReader(profile, hdfs_reader); - break; - } - default: - return Status::InternalError("unsupported file reader type: " + std::to_string(type)); - } - file_reader.reset(file_reader_ptr); - - return Status::OK(); -} - -// ============================ -// file scan node/unique ptr -Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/, - const FileSystemProperties& system_properties, - const FileDescription& file_description, - std::unique_ptr<io::FileSystem>* file_system, - io::FileReaderSPtr* file_reader) { - TFileType::type type = system_properties.system_type; - io::FileSystem* file_system_ptr = nullptr; - switch (type) { - case TFileType::FILE_S3: { - RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path, - &file_system_ptr, file_reader)); - break; - } - case TFileType::FILE_HDFS: { - RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path, - &file_system_ptr, file_reader)); - break; - } - case TFileType::FILE_BROKER: { - RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0], - system_properties.properties, file_description.path, - &file_system_ptr, file_reader)); - break; - } - default: - return Status::NotSupported("unsupported file reader type: {}", std::to_string(type)); - } - file_system->reset(file_system_ptr); - return Status::OK(); -} - -// file scan node/stream load pipe -Status NewFileFactory::create_pipe_reader(const TUniqueId& load_id, - io::FileReaderSPtr* file_reader) { - *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); - if (!(*file_reader)) { - return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); - } - return Status::OK(); -} - -Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, - int64_t start_offset, FileReader** reader) { - *reader = new HdfsFileReader(hdfs_params, path, start_offset); - return Status::OK(); -} - -Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, - io::FileSystem** hdfs_file_system, - io::FileReaderSPtr* reader) { - *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, path); - (dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect(); - (*hdfs_file_system)->open_file(path, reader); - return Status::OK(); -} - -Status NewFileFactory::create_hdfs_writer(const std::map<std::string, std::string>& properties, - const std::string& path, - std::unique_ptr<FileWriter>& writer) { - writer.reset(new HDFSWriter(properties, path)); - return Status::OK(); -} - -Status NewFileFactory::create_s3_reader(const std::map<std::string, std::string>& prop, - const std::string& path, io::FileSystem** s3_file_system, - io::FileReaderSPtr* reader) { - S3URI s3_uri(path); - if (!s3_uri.parse()) { - return Status::InvalidArgument("s3 uri is invalid: {}", path); - } - S3Conf s3_conf; - RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf)); - *s3_file_system = new io::S3FileSystem(s3_conf, ""); - (dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect(); - (*s3_file_system)->open_file(s3_uri.get_key(), reader); - return Status::OK(); -} - -Status NewFileFactory::create_broker_reader(const TNetworkAddress& broker_addr, - const std::map<std::string, std::string>& prop, - const std::string& path, - io::FileSystem** broker_file_system, - io::FileReaderSPtr* reader) { - *broker_file_system = new io::BrokerFileSystem(broker_addr, prop); - (dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect(); - (*broker_file_system)->open_file(path, reader); - return Status::OK(); -} -} // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h b/be/src/vec/exec/format/file_reader/new_file_factory.h deleted file mode 100644 index aa510718ba..0000000000 --- a/be/src/vec/exec/format/file_reader/new_file_factory.h +++ /dev/null @@ -1,115 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#pragma once - -#include "gen_cpp/PlanNodes_types.h" -#include "gen_cpp/Types_types.h" -#include "io/file_reader.h" -#include "io/file_writer.h" -#include "io/fs/file_reader.h" - -namespace doris { -namespace io { -class FileSystem; -} - -class ExecEnv; -class TNetworkAddress; -class RuntimeProfile; - -struct FileSystemProperties { - TFileType::type system_type; - std::map<std::string, std::string> properties; - THdfsParams hdfs_params; - std::vector<TNetworkAddress> broker_addresses; -}; - -struct FileDescription { - std::string path; - int64_t start_offset; - size_t file_size; - size_t buffer_size; -}; - -class NewFileFactory { -public: - // Create FileWriter - static Status create_file_writer(TFileType::type type, ExecEnv* env, - const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, - const std::string& path, int64_t start_offset, - std::unique_ptr<FileWriter>& file_writer); - - /** - * Create FileReader for broker scan node related scanners and readers - */ - static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile, - const std::vector<TNetworkAddress>& broker_addresses, - const std::map<std::string, std::string>& properties, - const TBrokerRangeDesc& range, int64_t start_offset, - std::unique_ptr<FileReader>& file_reader); - /** - * Create FileReader for file scan node rlated scanners and readers - * If buffer_size > 0, use BufferedReader to wrap the underlying FileReader; - * Otherwise, return the underlying FileReader directly. - */ - static Status create_file_reader(RuntimeProfile* profile, - const FileSystemProperties& system_properties, - const FileDescription& file_description, - std::unique_ptr<io::FileSystem>* file_system, - io::FileReaderSPtr* file_reader); - - // Create FileReader for stream load pipe - static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); - - // TODO(ftw): should be delete after new_hdfs_file_reader ready - static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, - int64_t start_offset, FileReader** reader); - - static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, - io::FileSystem** hdfs_file_system, io::FileReaderSPtr* reader); - - // TODO(ftw): should be delete after new_hdfs_file_writer ready - static Status create_hdfs_writer(const std::map<std::string, std::string>& properties, - const std::string& path, std::unique_ptr<FileWriter>& writer); - - static Status create_s3_reader(const std::map<std::string, std::string>& prop, - const std::string& path, io::FileSystem** s3_file_system, - io::FileReaderSPtr* reader); - - static Status create_broker_reader(const TNetworkAddress& broker_addr, - const std::map<std::string, std::string>& prop, - const std::string& path, io::FileSystem** hdfs_file_system, - io::FileReaderSPtr* reader); - - static TFileType::type convert_storage_type(TStorageBackendType::type type) { - switch (type) { - case TStorageBackendType::LOCAL: - return TFileType::FILE_LOCAL; - case TStorageBackendType::S3: - return TFileType::FILE_S3; - case TStorageBackendType::BROKER: - return TFileType::FILE_BROKER; - case TStorageBackendType::HDFS: - return TFileType::FILE_HDFS; - default: - LOG(FATAL) << "not match type to convert, from type:" << type; - } - __builtin_unreachable(); - } -}; -} // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp new file mode 100644 index 0000000000..016d29ca1d --- /dev/null +++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "new_plain_binary_line_reader.h" + +#include <gen_cpp/Types_types.h> + +#include "io/fs/file_reader.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/iterators.h" + +namespace doris { + +NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, + TFileType::type file_type) + : _file_reader(file_reader), _file_type(file_type) {} + +NewPlainBinaryLineReader::~NewPlainBinaryLineReader() { + close(); +} + +void NewPlainBinaryLineReader::close() {} + +Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) { + std::unique_ptr<uint8_t[]> file_buf; + size_t read_size = 0; + IOContext io_ctx; + io_ctx.reader_type = READER_QUERY; + switch (_file_type) { + case TFileType::FILE_LOCAL: + case TFileType::FILE_HDFS: + case TFileType::FILE_S3: { + size_t file_size = _file_reader->size(); + file_buf.reset(new uint8_t[file_size]); + Slice result(file_buf.get(), file_size); + RETURN_IF_ERROR(_file_reader->read_at(0, result, io_ctx, &read_size)); + break; + } + case TFileType::FILE_STREAM: { + RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get())) + ->read_one_message(&file_buf, &read_size)); + + break; + } + default: { + return Status::NotSupported("no supported file reader type: {}", _file_type); + } + } + *ptr = file_buf.release(); + *size = read_size; + if (read_size == 0) { + *eof = true; + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/io/hdfs_reader_writer.h b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h similarity index 54% rename from be/src/io/hdfs_reader_writer.h rename to be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h index 3b27b22656..b8b3f398db 100644 --- a/be/src/io/hdfs_reader_writer.h +++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h @@ -17,22 +17,27 @@ #pragma once -#include "gen_cpp/PlanNodes_types.h" -#include "io/file_reader.h" -#include "io/file_writer.h" +#include <gen_cpp/Types_types.h> + +#include "exec/line_reader.h" +#include "io/fs/file_reader.h" namespace doris { -// TODO(ftw): This file should be deleted when new_file_factory.h replace file_factory.h -class HdfsReaderWriter { +class NewPlainBinaryLineReader : public LineReader { public: - static Status create_reader(const THdfsParams& hdfs_params, const std::string& path, - int64_t start_offset, FileReader** reader); + NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, TFileType::type file_type); + + ~NewPlainBinaryLineReader() override; + + Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override; - static Status create_reader(const std::map<std::string, std::string>& properties, - const std::string& path, int64_t start_offset, FileReader** reader); + void close() override; - static Status create_writer(const std::map<std::string, std::string>& properties, - const std::string& path, std::unique_ptr<FileWriter>& writer); +private: + io::FileReaderSPtr _file_reader; + + TFileType::type _file_type; }; + } // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index 56bc26c648..52af605154 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -33,7 +33,8 @@ namespace doris { -NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader, +NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, + io::FileReaderSPtr file_reader, Decompressor* decompressor, size_t length, const std::string& line_delimiter, size_t line_delimiter_length, size_t current_offset) @@ -134,11 +135,6 @@ void NewPlainTextLineReader::extend_input_buf() { _input_buf_limit -= _input_buf_pos; _input_buf_pos = 0; } while (false); - - // LOG(INFO) << "extend input buf." - // << " input_buf_size: " << _input_buf_size - // << " input_buf_pos: " << _input_buf_pos - // << " input_buf_limit: " << _input_buf_limit; } void NewPlainTextLineReader::extend_output_buf() { @@ -176,11 +172,6 @@ void NewPlainTextLineReader::extend_output_buf() { _output_buf_limit -= _output_buf_pos; _output_buf_pos = 0; } while (false); - - // LOG(INFO) << "extend output buf." - // << " output_buf_size: " << _output_buf_size - // << " output_buf_pos: " << _output_buf_pos - // << " output_buf_limit: " << _output_buf_limit; } Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) { @@ -241,9 +232,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool IOContext io_ctx; RETURN_IF_ERROR( _file_reader->read_at(_current_offset, file_slice, io_ctx, &read_len)); + _current_offset += read_len; + if (read_len == 0) { + _file_eof = true; + } COUNTER_UPDATE(_bytes_read_counter, read_len); } - // LOG(INFO) << "after read file: _file_eof: " << _file_eof << " read_len: " << read_len; if (_file_eof || read_len == 0) { if (!_stream_end) { return Status::InternalError( diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h index a39e578577..99debf6d18 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -18,19 +18,17 @@ #pragma once #include "exec/line_reader.h" +#include "io/fs/file_reader.h" #include "util/runtime_profile.h" namespace doris { -namespace io { -class FileReader; -} class Decompressor; class Status; class NewPlainTextLineReader : public LineReader { public: - NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader, + NewPlainTextLineReader(RuntimeProfile* profile, io::FileReaderSPtr file_reader, Decompressor* decompressor, size_t length, const std::string& line_delimiter, size_t line_delimiter_length, size_t current_offset); @@ -60,7 +58,7 @@ private: void extend_output_buf(); RuntimeProfile* _profile; - io::FileReader* _file_reader; + io::FileReaderSPtr _file_reader; Decompressor* _decompressor; // the min length that should be read. // -1 means endless(for stream load) diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index e179f91eea..13e86b1ba9 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -18,12 +18,14 @@ #include "vec/exec/format/json/new_json_reader.h" #include "common/compiler_util.h" -#include "exec/plain_text_line_reader.h" #include "exprs/json_functions.h" #include "io/file_factory.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/iterators.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" +#include "vec/exec/format/file_reader/new_plain_text_line_reader.h" #include "vec/exec/scan/vscanner.h" namespace doris::vectorized { using namespace ErrorCode; @@ -38,9 +40,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann _params(params), _range(range), _file_slot_descs(file_slot_descs), + _file_system(nullptr), _file_reader(nullptr), - _file_reader_s(nullptr), - _real_file_reader(nullptr), _line_reader(nullptr), _reader_eof(false), _skip_first_line(false), @@ -49,7 +50,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann _value_allocator(_value_buffer, sizeof(_value_buffer)), _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), - _scanner_eof(scanner_eof) { + _scanner_eof(scanner_eof), + _current_offset(0) { _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "ReadTime"); _file_read_timer = ADD_TIMER(_profile, "FileReadTime"); @@ -64,9 +66,6 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams _params(params), _range(range), _file_slot_descs(file_slot_descs), - _file_reader(nullptr), - _file_reader_s(nullptr), - _real_file_reader(nullptr), _line_reader(nullptr), _reader_eof(false), _skip_first_line(false), @@ -159,11 +158,11 @@ Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, if (_line_reader != nullptr) { RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof)); } else { - int64_t length = 0; - RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, &length)); + size_t read_size = 0; + RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size)); json_str = json_str_ptr.get(); - size = length; - if (length == 0) { + size = read_size; + if (read_size == 0) { eof = true; } } @@ -286,15 +285,25 @@ Status NewJsonReader::_open_file_reader() { start_offset -= 1; } + _current_offset = start_offset; + + FileSystemProperties system_properties; + system_properties.system_type = _params.file_type; + system_properties.properties = _params.properties; + system_properties.hdfs_params = _params.hdfs_params; + + FileDescription file_description; + file_description.path = _range.path; + file_description.start_offset = start_offset; + file_description.file_size = _range.__isset.file_size ? _range.file_size : 0; + if (_params.file_type == TFileType::FILE_STREAM) { - RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, _file_reader_s)); - _real_file_reader = _file_reader_s.get(); + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _params, _range.path, start_offset, _range.file_size, 0, _file_reader)); - _real_file_reader = _file_reader.get(); + _profile, system_properties, file_description, &_file_system, &_file_reader)); } - return _real_file_reader->open(); + return Status::OK(); } Status NewJsonReader::_open_line_reader() { @@ -306,8 +315,9 @@ Status NewJsonReader::_open_line_reader() { } else { _skip_first_line = false; } - _line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader, nullptr, size, - _line_delimiter, _line_delimiter_length)); + _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader, nullptr, size, + _line_delimiter, _line_delimiter_length, + _current_offset)); return Status::OK(); } @@ -509,13 +519,12 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { if (_line_reader != nullptr) { RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof)); } else { - int64_t length = 0; - RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, &length)); - json_str = json_str_ptr.get(); - *size = length; - if (length == 0) { + RETURN_IF_ERROR(_read_one_message(&json_str_ptr, size)); + json_str = json_str_ptr.release(); + if (*size == 0) { *eof = true; } + _current_offset += *size; } _bytes_read_counter += *size; @@ -877,4 +886,28 @@ std::string NewJsonReader::_print_json_value(const rapidjson::Value& value) { return std::string(buffer.GetString()); } +Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size) { + IOContext io_ctx; + io_ctx.reader_type = READER_QUERY; + switch (_params.file_type) { + case TFileType::FILE_LOCAL: + case TFileType::FILE_HDFS: + case TFileType::FILE_S3: { + size_t file_size = _file_reader->size(); + file_buf->reset(new uint8_t[file_size]); + Slice result(file_buf->get(), file_size); + RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, io_ctx, read_size)); + break; + } + case TFileType::FILE_STREAM: { + RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get())) + ->read_one_message(file_buf, read_size)); + break; + } + default: { + return Status::NotSupported("no supported file reader type: {}", _params.file_type); + } + } + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 5a057d32fe..98aae55ea4 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -22,6 +22,7 @@ #include <rapidjson/stringbuffer.h> #include <rapidjson/writer.h> +#include "io/fs/file_reader.h" #include "vec/exec/format/generic_reader.h" namespace doris { @@ -92,7 +93,8 @@ private: std::string _print_json_value(const rapidjson::Value& value); -private: + Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size); + Status (NewJsonReader::*_vhandle_json_callback)( std::vector<vectorized::MutableColumnPtr>& columns, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof); @@ -103,12 +105,8 @@ private: const TFileRangeDesc& _range; const std::vector<SlotDescriptor*>& _file_slot_descs; - // _file_reader_s is for stream load pipe reader, - // and _file_reader is for other file reader. - // TODO: refactor this to use only shared_ptr or unique_ptr - std::unique_ptr<FileReader> _file_reader; - std::shared_ptr<FileReader> _file_reader_s; - FileReader* _real_file_reader; + std::unique_ptr<io::FileSystem> _file_system; + io::FileReaderSPtr _file_reader; std::unique_ptr<LineReader> _line_reader; bool _reader_eof; @@ -134,9 +132,8 @@ private: char _value_buffer[4 * 1024 * 1024]; // 4MB char _parse_buffer[512 * 1024]; // 512KB - typedef rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>, - rapidjson::MemoryPoolAllocator<>> - Document; + using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>, + rapidjson::MemoryPoolAllocator<>>; rapidjson::MemoryPoolAllocator<> _value_allocator; rapidjson::MemoryPoolAllocator<> _parse_allocator; Document _origin_json_doc; // origin json document object from parsed json string @@ -145,6 +142,8 @@ private: bool* _scanner_eof; + size_t _current_offset; + RuntimeProfile::Counter* _bytes_read_counter; RuntimeProfile::Counter* _read_timer; RuntimeProfile::Counter* _file_read_timer; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 2748160e97..c6867b0d83 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -23,6 +23,9 @@ #include "cctz/time_zone.h" #include "gutil/strings/substitute.h" #include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "olap/iterators.h" +#include "util/slice.h" #include "vec/columns/column_array.h" #include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_nullable.h" @@ -47,10 +50,11 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { SCOPED_RAW_TIMER(&_statistics.read_time); uint64_t has_read = 0; char* out = reinterpret_cast<char*>(buf); + IOContext io_ctx; while (has_read < length) { - int64_t loop_read; - Status st = _file_reader->readat(offset + has_read, length - has_read, &loop_read, - out + has_read); + size_t loop_read; + Slice result(out + has_read, length - has_read); + Status st = _file_reader->read_at(offset + has_read, result, io_ctx, &loop_read); if (!st.ok()) { throw orc::ParseError( strings::Substitute("Failed to read $0: $1", _file_name, st.to_string())); @@ -87,7 +91,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r _scan_params(params), _scan_range(range), _ctz(ctz), - _column_names(column_names) {} + _column_names(column_names), + _file_system(nullptr) {} OrcReader::~OrcReader() { close(); @@ -132,12 +137,22 @@ Status OrcReader::init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { SCOPED_RAW_TIMER(&_statistics.parse_meta_time); if (_file_reader == nullptr) { - std::unique_ptr<FileReader> inner_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path, - _scan_range.start_offset, - _scan_range.file_size, 0, inner_reader)); - RETURN_IF_ERROR(inner_reader->open()); - _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader.release()); + io::FileReaderSPtr inner_reader; + + FileSystemProperties system_properties; + system_properties.system_type = _scan_params.file_type; + system_properties.properties = _scan_params.properties; + system_properties.hdfs_params = _scan_params.hdfs_params; + + FileDescription file_description; + file_description.path = _scan_range.path; + file_description.start_offset = _scan_range.start_offset; + file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0; + + RETURN_IF_ERROR(FileFactory::create_file_reader( + _profile, system_properties, file_description, &_file_system, &inner_reader)); + + _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader); } if (_file_reader->getLength() == 0) { return Status::EndOfFile("Empty orc file"); @@ -179,12 +194,22 @@ Status OrcReader::init_reader( Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) { if (_file_reader == nullptr) { - std::unique_ptr<FileReader> inner_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path, - _scan_range.start_offset, - _scan_range.file_size, 0, inner_reader)); - RETURN_IF_ERROR(inner_reader->open()); - _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader.release()); + io::FileReaderSPtr inner_reader; + + FileSystemProperties system_properties; + system_properties.system_type = _scan_params.file_type; + system_properties.properties = _scan_params.properties; + system_properties.hdfs_params = _scan_params.hdfs_params; + + FileDescription file_description; + file_description.path = _scan_range.path; + file_description.start_offset = _scan_range.start_offset; + file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0; + + RETURN_IF_ERROR(FileFactory::create_file_reader( + _profile, system_properties, file_description, &_file_system, &inner_reader)); + + _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader); } if (_file_reader->getLength() == 0) { return Status::EndOfFile("Empty orc file"); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 98c2fc7c02..4f5f4dba10 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -21,7 +21,7 @@ #include "common/config.h" #include "exec/olap_common.h" -#include "io/file_reader.h" +#include "io/fs/file_reader.h" #include "vec/core/block.h" #include "vec/data_types/data_type_decimal.h" #include "vec/exec/format/format_common.h" @@ -37,15 +37,10 @@ public: int64_t read_bytes = 0; }; - ORCFileInputStream(const std::string& file_name, FileReader* file_reader) + ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr file_reader) : _file_name(file_name), _file_reader(file_reader) {}; - ~ORCFileInputStream() override { - if (_file_reader != nullptr) { - _file_reader->close(); - delete _file_reader; - } - } + ~ORCFileInputStream() override = default; uint64_t getLength() const override { return _file_reader->size(); } @@ -60,7 +55,7 @@ public: private: Statistics _statistics; const std::string& _file_name; - FileReader* _file_reader; + io::FileReaderSPtr _file_reader; }; class OrcReader : public GenericReader { @@ -82,7 +77,7 @@ public: ~OrcReader() override; // for test - void set_file_reader(const std::string& file_name, FileReader* file_reader) { + void set_file_reader(const std::string& file_name, io::FileReaderSPtr file_reader) { _file_reader = new ORCFileInputStream(file_name, file_reader); } @@ -283,6 +278,8 @@ private: orc::ReaderOptions _reader_options; orc::RowReaderOptions _row_reader_options; + std::unique_ptr<io::FileSystem> _file_system; + // only for decimal DecimalScaleParams _decimal_scale_params; }; diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h index 5811f034bd..269f6a4368 100644 --- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h +++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h @@ -23,7 +23,8 @@ #include "common/logging.h" #include "gen_cpp/parquet_types.h" -#include "io/file_reader.h" +#include "io/fs/file_reader.h" +#include "olap/iterators.h" #include "util/coding.h" #include "util/thrift_util.h" #include "vparquet_file_metadata.h" @@ -33,12 +34,14 @@ namespace doris::vectorized { constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; constexpr uint32_t PARQUET_FOOTER_SIZE = 8; -static Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file_metadata) { +static Status parse_thrift_footer(io::FileReaderSPtr file, + std::shared_ptr<FileMetaData>& file_metadata) { uint8_t footer[PARQUET_FOOTER_SIZE]; int64_t file_size = file->size(); - int64_t bytes_read = 0; - RETURN_IF_ERROR(file->readat(file_size - PARQUET_FOOTER_SIZE, PARQUET_FOOTER_SIZE, &bytes_read, - footer)); + size_t bytes_read = 0; + Slice result(footer, PARQUET_FOOTER_SIZE); + IOContext io_ctx; + RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result, io_ctx, &bytes_read)); DCHECK_EQ(bytes_read, PARQUET_FOOTER_SIZE); // validate magic @@ -57,8 +60,9 @@ static Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData tparquet::FileMetaData t_metadata; // deserialize footer uint8_t meta_buff[metadata_size]; - RETURN_IF_ERROR(file->readat(file_size - PARQUET_FOOTER_SIZE - metadata_size, metadata_size, - &bytes_read, meta_buff)); + Slice res(meta_buff, metadata_size); + RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, res, io_ctx, + &bytes_read)); DCHECK_EQ(bytes_read, metadata_size); RETURN_IF_ERROR(deserialize_thrift_msg(meta_buff, &metadata_size, true, &t_metadata)); file_metadata.reset(new FileMetaData(t_metadata)); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 75f43b4730..a7d95225b6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -26,7 +26,7 @@ namespace doris::vectorized { -Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, +Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, const ParquetReadColumn& column, const tparquet::RowGroup& row_group, const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, @@ -80,8 +80,8 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end } } -Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, - size_t max_buf_size) { +Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, + tparquet::ColumnChunk* chunk, size_t max_buf_size) { _stream_reader = new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size(), std::min((size_t)_metadata->size(), max_buf_size)); @@ -273,8 +273,8 @@ void ArrayColumnReader::_reserve_def_levels_buf(size_t size) { } } -Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, - size_t max_buf_size) { +Status ArrayColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, + tparquet::ColumnChunk* chunk, size_t max_buf_size) { _stream_reader = new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size(), std::min((size_t)_metadata->size(), max_buf_size)); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index de0ec185b9..0f61c5742c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -107,8 +107,8 @@ public: virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof) = 0; - static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, - const tparquet::RowGroup& row_group, + static Status create(io::FileReaderSPtr file, FieldSchema* field, + const ParquetReadColumn& column, const tparquet::RowGroup& row_group, const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size); void init_column_metadata(const tparquet::ColumnChunk& chunk); @@ -139,7 +139,7 @@ public: ScalarColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) : ParquetColumnReader(row_ranges, ctz) {}; ~ScalarColumnReader() override { close(); }; - Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, + Status init(io::FileReaderSPtr file, FieldSchema* field, tparquet::ColumnChunk* chunk, size_t max_buf_size); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, @@ -155,7 +155,7 @@ public: ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) : ParquetColumnReader(row_ranges, ctz) {}; ~ArrayColumnReader() override { close(); }; - Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, + Status init(io::FileReaderSPtr file, FieldSchema* field, tparquet::ColumnChunk* chunk, size_t max_buf_size); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 24441896a9..b406919b4c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -25,7 +25,7 @@ namespace doris::vectorized { const std::vector<int64_t> RowGroupReader::NO_DELETE = {}; -RowGroupReader::RowGroupReader(doris::FileReader* file_reader, +RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, const tparquet::RowGroup& row_group, cctz::time_zone* ctz, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 53e8a052c0..d53e32de2a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -19,6 +19,7 @@ #include "exec/text_converter.h" #include "io/file_reader.h" +#include "io/fs/file_reader.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" #include "vparquet_column_reader.h" @@ -99,7 +100,7 @@ public: PositionDeleteContext(const PositionDeleteContext& filter) = default; }; - RowGroupReader(doris::FileReader* file_reader, + RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, const tparquet::RowGroup& row_group, cctz::time_zone* ctz, const PositionDeleteContext& position_delete_ctx, @@ -131,7 +132,7 @@ private: Block* block, size_t rows, const std::unordered_map<std::string, VExprContext*>& missing_columns); - doris::FileReader* _file_reader; + io::FileReaderSPtr _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; const std::vector<ParquetReadColumn>& _read_columns; const int32_t _row_group_id; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 3d00f52355..ecf4d23259 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -21,6 +21,7 @@ #include "common/status.h" #include "io/file_factory.h" +#include "olap/iterators.h" #include "parquet_pred_cmp.h" #include "parquet_thrift_util.h" #include "vec/exprs/vbloom_predicate.h" @@ -135,18 +136,26 @@ void ParquetReader::close() { } Status ParquetReader::_open_file() { + FileSystemProperties system_properties; + system_properties.system_type = _scan_params.file_type; + system_properties.properties = _scan_params.properties; + system_properties.hdfs_params = _scan_params.hdfs_params; + + FileDescription file_description; + file_description.path = _scan_range.path; + file_description.start_offset = _scan_range.start_offset; + file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0; + if (_file_reader == nullptr) { - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path, - _scan_range.start_offset, - _scan_range.file_size, 0, _file_reader)); + RETURN_IF_ERROR(FileFactory::create_file_reader( + _profile, system_properties, file_description, &_file_system, &_file_reader)); } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - RETURN_IF_ERROR(_file_reader->open()); if (_file_reader->size() == 0) { return Status::EndOfFile("Empty Parquet File"); } - RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); + RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata)); } return Status::OK(); } @@ -403,7 +412,7 @@ Status ParquetReader::_next_row_group_reader() { RowGroupReader::PositionDeleteContext position_delete_ctx = _get_position_delete_ctx(row_group, row_group_index); - _current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, + _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz, position_delete_ctx, _lazy_read_ctx)); return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, @@ -493,16 +502,17 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, return Status::OK(); } uint8_t col_index_buff[page_index._column_index_size]; - int64_t bytes_read = 0; - RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start, - page_index._column_index_size, &bytes_read, - col_index_buff)); + size_t bytes_read = 0; + Slice result(col_index_buff, page_index._column_index_size); + IOContext io_ctx; + RETURN_IF_ERROR( + _file_reader->read_at(page_index._column_index_start, result, io_ctx, &bytes_read)); auto& schema_desc = _file_metadata->schema(); std::vector<RowRange> skipped_row_ranges; uint8_t off_index_buff[page_index._offset_index_size]; - RETURN_IF_ERROR(_file_reader->readat(page_index._offset_index_start, - page_index._offset_index_size, &bytes_read, - off_index_buff)); + Slice res(off_index_buff, page_index._offset_index_size); + RETURN_IF_ERROR( + _file_reader->read_at(page_index._offset_index_start, res, io_ctx, &bytes_read)); for (auto& read_col : _read_columns) { auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name); if (_colname_to_value_range->end() == conjunct_iter) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 2bfc74f823..2f2d37aec5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -26,7 +26,8 @@ #include "common/status.h" #include "exec/olap_common.h" #include "gen_cpp/parquet_types.h" -#include "io/file_reader.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_system.h" #include "vec/core/block.h" #include "vec/exec/format/generic_reader.h" #include "vec/exprs/vexpr_context.h" @@ -61,7 +62,7 @@ public: ~ParquetReader() override; // for test - void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); } + void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; } Status init_reader(const std::vector<std::string>& column_names, bool filter_groups = true) { // without predicate @@ -151,7 +152,8 @@ private: RuntimeProfile* _profile; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; - std::unique_ptr<FileReader> _file_reader = nullptr; + std::unique_ptr<io::FileSystem> _file_system = nullptr; + io::FileReaderSPtr _file_reader = nullptr; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; std::unique_ptr<RowGroupReader> _current_group_reader; diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 2e5a1e93c6..4cc3122714 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -23,7 +23,7 @@ #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" #include "runtime/exec_env.h" -#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "util/cpu_info.h" @@ -38,8 +38,8 @@ extern TStreamLoadPutResult k_stream_load_put_result; class RoutineLoadTaskExecutorTest : public testing::Test { public: - RoutineLoadTaskExecutorTest() {} - virtual ~RoutineLoadTaskExecutorTest() {} + RoutineLoadTaskExecutorTest() = default; + ~RoutineLoadTaskExecutorTest() override = default; void SetUp() override { k_stream_load_begin_result = TLoadTxnBeginResult(); @@ -47,24 +47,20 @@ public: k_stream_load_rollback_result = TLoadTxnRollbackResult(); k_stream_load_put_result = TStreamLoadPutResult(); - _env._master_info = new TMasterInfo(); - _env._load_stream_mgr = new LoadStreamMgr(); - _env._stream_load_executor = new StreamLoadExecutor(&_env); + _env.set_master_info(new TMasterInfo()); + _env.set_new_load_stream_mgr(new NewLoadStreamMgr()); + _env.set_stream_load_executor(new StreamLoadExecutor(&_env)); config::routine_load_thread_pool_size = 5; config::max_consumer_num_per_group = 3; } void TearDown() override { - delete _env._master_info; - _env._master_info = nullptr; - delete _env._load_stream_mgr; - _env._load_stream_mgr = nullptr; - delete _env._stream_load_executor; - _env._stream_load_executor = nullptr; + delete _env.master_info(); + delete _env.new_load_stream_mgr(); + delete _env.stream_load_executor(); } -private: ExecEnv _env; }; diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 1424cf3203..319feb0b8c 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -18,7 +18,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> -#include "io/local_file_reader.h" +#include "io/fs/local_file_system.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type_factory.hpp" @@ -89,8 +89,9 @@ TEST_F(ParquetReaderTest, normal) { DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl); auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); - LocalFileReader* reader = - new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); + io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>(""); + io::FileReaderSPtr reader; + local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", &reader); cctz::time_zone ctz; TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index d1f59e9234..f0712cf7c5 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -24,8 +24,8 @@ #include "exec/schema_scanner.h" #include "io/buffered_reader.h" -#include "io/file_reader.h" -#include "io/local_file_reader.h" +#include "io/fs/local_file_system.h" +#include "olap/iterators.h" #include "runtime/string_value.h" #include "util/runtime_profile.h" #include "util/timezone_utils.h" @@ -47,13 +47,14 @@ public: }; TEST_F(ParquetThriftReaderTest, normal) { - LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0); - - auto st = reader.open(); + io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>(""); + io::FileReaderSPtr reader; + auto st = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/localfile.parquet", + &reader); EXPECT_TRUE(st.ok()); std::shared_ptr<FileMetaData> meta_data; - parse_thrift_footer(&reader, meta_data); + parse_thrift_footer(reader, meta_data); tparquet::FileMetaData t_metadata = meta_data->to_thrift(); LOG(WARNING) << "====================================="; @@ -77,13 +78,15 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) { // `hobby` array<map<string,string>>, // `friend` map<string,string>, // `mark` struct<math:int,english:int>) - LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet", 0); - auto st = reader.open(); + io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>(""); + io::FileReaderSPtr reader; + auto st = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet", + &reader); EXPECT_TRUE(st.ok()); std::shared_ptr<FileMetaData> metadata; - parse_thrift_footer(&reader, metadata); + parse_thrift_footer(reader, metadata); tparquet::FileMetaData t_metadata = metadata->to_thrift(); FieldDescriptor schemaDescriptor; schemaDescriptor.parse_from_thrift(t_metadata.schema); @@ -146,7 +149,7 @@ static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions, s return null_cnt; } -static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* column_chunk, +static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, ColumnPtr& doris_column, DataTypePtr& data_type, level_t* definitions) { tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data; @@ -280,14 +283,15 @@ static void read_parquet_data_and_check(const std::string& parquet_file, * `list_string` array<string>) // 14 */ - LocalFileReader reader(parquet_file, 0); - auto st = reader.open(); + io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>(""); + io::FileReaderSPtr reader; + auto st = local_fs->open_file(parquet_file, &reader); EXPECT_TRUE(st.ok()); std::unique_ptr<vectorized::Block> block; create_block(block); std::shared_ptr<FileMetaData> metadata; - parse_thrift_footer(&reader, metadata); + parse_thrift_footer(reader, metadata); tparquet::FileMetaData t_metadata = metadata->to_thrift(); FieldDescriptor schema_descriptor; schema_descriptor.parse_from_thrift(t_metadata.schema); @@ -297,7 +301,7 @@ static void read_parquet_data_and_check(const std::string& parquet_file, auto& column_name_with_type = block->get_by_position(c); auto& data_column = column_name_with_type.column; auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[c], + get_column_values(reader, &t_metadata.row_groups[0].columns[c], const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column, data_type, defs); } @@ -306,7 +310,7 @@ static void read_parquet_data_and_check(const std::string& parquet_file, auto& column_name_with_type = block->get_by_position(14); auto& data_column = column_name_with_type.column; auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[13], + get_column_values(reader, &t_metadata.row_groups[0].columns[13], const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column, data_type, defs); } @@ -315,19 +319,20 @@ static void read_parquet_data_and_check(const std::string& parquet_file, auto& column_name_with_type = block->get_by_position(15); auto& data_column = column_name_with_type.column; auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[9], + get_column_values(reader, &t_metadata.row_groups[0].columns[9], const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column, data_type, defs); } - LocalFileReader result(result_file, 0); - auto rst = result.open(); + io::FileReaderSPtr result; + auto rst = local_fs->open_file(result_file, &result); EXPECT_TRUE(rst.ok()); - uint8_t result_buf[result.size() + 1]; - result_buf[result.size()] = '\0'; - int64_t bytes_read; - bool eof; - result.read(result_buf, result.size(), &bytes_read, &eof); + uint8_t result_buf[result->size() + 1]; + result_buf[result->size()] = '\0'; + size_t bytes_read; + Slice res(result_buf, result->size()); + IOContext io_ctx; + result->read_at(0, res, io_ctx, &bytes_read); ASSERT_STREQ(block->dump_data(0, rows).c_str(), reinterpret_cast<char*>(result_buf)); } @@ -400,13 +405,15 @@ TEST_F(ParquetThriftReaderTest, group_reader) { lazy_read_ctx.all_read_columns.emplace_back(slot->col_name()); read_columns.emplace_back(ParquetReadColumn(7, slot->col_name())); } - LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); - auto st = file_reader.open(); + io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>(""); + io::FileReaderSPtr file_reader; + auto st = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", + &file_reader); EXPECT_TRUE(st.ok()); // prepare metadata std::shared_ptr<FileMetaData> meta_data; - parse_thrift_footer(&file_reader, meta_data); + parse_thrift_footer(file_reader, meta_data); tparquet::FileMetaData t_metadata = meta_data->to_thrift(); cctz::time_zone ctz; @@ -414,10 +421,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) { auto row_group = t_metadata.row_groups[0]; std::shared_ptr<RowGroupReader> row_group_reader; RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0); - row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group, &ctz, + row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0, row_group, &ctz, position_delete_ctx, lazy_read_ctx)); std::vector<RowRange> row_ranges; row_ranges.emplace_back(0, row_group.num_rows); + auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>(); auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets); EXPECT_TRUE(stg.ok()); @@ -435,14 +443,16 @@ TEST_F(ParquetThriftReaderTest, group_reader) { auto stb = row_group_reader->next_batch(&block, 1024, &read_rows, &batch_eof); EXPECT_TRUE(stb.ok()); - LocalFileReader result("./be/test/exec/test_data/parquet_scanner/group-reader.txt", 0); - auto rst = result.open(); + io::FileReaderSPtr result; + auto rst = local_fs->open_file("./be/test/exec/test_data/parquet_scanner/group-reader.txt", + &result); EXPECT_TRUE(rst.ok()); - uint8_t result_buf[result.size() + 1]; - result_buf[result.size()] = '\0'; - int64_t bytes_read; - bool eof; - result.read(result_buf, result.size(), &bytes_read, &eof); + uint8_t result_buf[result->size() + 1]; + result_buf[result->size()] = '\0'; + size_t bytes_read; + Slice res(result_buf, result->size()); + IOContext io_ctx; + result->read_at(0, res, io_ctx, &bytes_read); ASSERT_STREQ(block.dump_data(0, 10).c_str(), reinterpret_cast<char*>(result_buf)); } } // namespace vectorized diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out b/regression-test/data/load_p0/stream_load/test_json_load.out index ebf706f594..d0de96b7d7 100644 --- a/regression-test/data/load_p0/stream_load/test_json_load.out +++ b/regression-test/data/load_p0/stream_load/test_json_load.out @@ -12,32 +12,6 @@ 10 hefei 23456710 200 changsha 3456789 --- !select1 -- -1 beijing 2345671 -2 shanghai 2345672 -3 guangzhou 2345673 -4 shenzhen 2345674 -5 hangzhou 2345675 -6 nanjing 2345676 -7 wuhan 2345677 -8 chengdu 2345678 -9 xian 2345679 -10 hefei 23456710 -200 changsha 3456789 - --- !select2 -- -10 beijing 2345671 -20 shanghai 2345672 -30 guangzhou 2345673 -40 shenzhen 2345674 -50 hangzhou 2345675 -60 nanjing 2345676 -70 wuhan 2345677 -80 chengdu 2345678 -90 xian 2345679 -100 hefei 23456710 -200 changsha 3456789 - -- !select2 -- 10 beijing 2345671 20 shanghai 2345672 @@ -64,32 +38,6 @@ 10 23456710 200 755 --- !select3 -- -1 2345671 -2 2345672 -3 2345673 -4 2345674 -5 2345675 -6 2345676 -7 2345677 -8 2345678 -9 2345679 -10 23456710 -200 755 - --- !select4 -- -1 210 -2 220 -3 230 -4 240 -5 250 -6 260 -7 270 -8 280 -9 290 -10 300 -200 755 - -- !select4 -- 1 210 2 220 @@ -116,32 +64,6 @@ 10 2345676 200 755 --- !select5 -- -1 1454547 -2 1244264 -3 528369 -4 594201 -5 594201 -6 2345672 -7 2345673 -8 2345674 -9 2345675 -10 2345676 -200 755 - --- !select6 -- -10 1454547 -20 1244264 -30 528369 -40 594201 -50 594201 -60 2345672 -70 2345673 -80 2345674 -90 2345675 -100 2345676 -200 755 - -- !select6 -- 10 1454547 20 1244264 @@ -163,22 +85,6 @@ 100 2345676 200 755 --- !select7 -- -60 2345672 -70 2345673 -80 2345674 -90 2345675 -100 2345676 -200 755 - --- !select8 -- -60 2345672 -70 2345673 -80 2345674 -90 2345675 -100 2345676 -200 755 - -- !select8 -- 60 2345672 70 2345673 @@ -195,33 +101,9 @@ 50 guangzhou 2345675 200 changsha 3456789 --- !select9 -- -10 beijing 2345671 -20 shanghai 2345672 -30 hangzhou 2345673 -40 shenzhen 2345674 -50 guangzhou 2345675 -200 changsha 3456789 - -- !select10 -- 200 changsha 3456789 --- !select10 -- -200 changsha 3456789 - --- !select11 -- -1 beijing 2345671 -2 shanghai 2345672 -3 guangzhou 2345673 -4 shenzhen 2345674 -5 hangzhou 2345675 -6 nanjing 2345676 -7 wuhan 2345677 -8 chengdu 2345678 -9 xian 2345679 -10 hefei 23456710 -200 changsha 3456789 - -- !select11 -- 1 beijing 2345671 2 shanghai 2345672 @@ -248,28 +130,6 @@ 10 \N 23456710 200 changsha 3456789 --- !select12 -- -1 \N 2345671 -2 shanghai 2345672 -3 beijing 2345673 -4 shenzhen 2345674 -5 hangzhou 2345675 -6 nanjing 2345676 -7 \N 2345677 -8 chengdu 2345678 -9 \N 2345679 -10 \N 23456710 -200 changsha 3456789 - --- !select13 -- -2 shanghai 2345672 -3 beijing 2345673 -4 shenzhen 2345674 -5 hangzhou 2345675 -6 nanjing 2345676 -8 chengdu 2345678 -200 hangzhou 12345 - -- !select13 -- 2 shanghai 2345672 3 beijing 2345673 @@ -287,22 +147,6 @@ 50 2345675 \N 200 changsha 3456789 --- !select14 -- -10 2345671 \N -20 2345672 \N -30 2345673 \N -40 2345674 \N -50 2345675 \N -200 changsha 3456789 - --- !select15 -- -10 beijing 2345671 -20 shanghai 2345672 -30 hangzhou 2345673 -40 shenzhen 2345674 -50 guangzhou 2345675 -200 changsha 3456789 - -- !select15 -- 10 beijing 2345671 20 shanghai 2345672 @@ -320,12 +164,3 @@ 6 fuyang 2345676 200 changsha 3456789 --- !select16 -- -1 xihu 2345671 -2 xiaoshan 2345672 -3 binjiang 2345673 -4 shangcheng 2345674 -5 tonglu 2345675 -6 fuyang 2345676 -200 changsha 3456789 - diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_json_load.groovy index 513d3e14dd..d2c5d15fe0 100644 --- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy @@ -114,7 +114,7 @@ suite("test_json_load", "p0") { assertTrue(result1[0][0] == 0, "Create table should update 0 rows") } - def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, format_flag, exprs, json_paths, + def load_json_data = {label, strip_flag, read_flag, format_flag, exprs, json_paths, json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false -> // load the json data @@ -207,17 +207,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case1', 'true', '', 'json', '', '', '', '', '', 'simple_json.json') - - sql "sync" - qt_select1 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case1_2', 'true', '', 'json', '', '', '', '', '', 'simple_json.json') + load_json_data.call('test_json_load_case1_2', 'true', '', 'json', '', '', '', '', '', 'simple_json.json') sql "sync" qt_select1 "select * from ${testTable} order by id" @@ -232,17 +222,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json') - - sql "sync" - qt_select2 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case2_2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json') + load_json_data.call('test_json_load_case2_2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json') sql "sync" qt_select2 "select * from ${testTable} order by id" @@ -257,18 +237,7 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('false', 'test_json_load_case3', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]', - '', '', '', 'simple_json.json') - - sql "sync" - qt_select3 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table2.call(testTable) - - load_json_data.call('true', 'test_json_load_case3_2', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]', + load_json_data.call('test_json_load_case3_2', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]', '', '', '', 'simple_json.json') sql "sync" @@ -284,18 +253,7 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('false', 'test_json_load_case4', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]', - '', '', '', 'simple_json.json') - - sql "sync" - qt_select4 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table2.call(testTable) - - load_json_data.call('true', 'test_json_load_case4_2', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]', + load_json_data.call('test_json_load_case4_2', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]', '', '', '', 'simple_json.json') sql "sync" @@ -311,18 +269,7 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('false', 'test_json_load_case5', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]', - '', '', '', 'multi_line_json.json') - - sql "sync" - qt_select5 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table2.call(testTable) - - load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]', + load_json_data.call('test_json_load_case5_2', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]', '', '', '', 'multi_line_json.json') sql "sync" @@ -338,19 +285,7 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('false', 'test_json_load_case6', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', - '', '', '', 'multi_line_json.json') - - sql "sync" - qt_select6 "select * from ${testTable} order by id" - - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table2.call(testTable) - - load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('test_json_load_case6_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '', '', '', 'multi_line_json.json') sql "sync" @@ -366,18 +301,7 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('false', 'test_json_load_case7', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', - '', 'id > 50', '', 'multi_line_json.json') - - sql "sync" - qt_select7 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table2.call(testTable) - - load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('test_json_load_case7_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '', 'id > 50', '', 'multi_line_json.json') sql "sync" @@ -393,19 +317,7 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('false', 'test_json_load_case8', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', - '', 'id > 50', 'true', 'multi_line_json.json') - - sql "sync" - qt_select8 "select * from ${testTable} order by id" - - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table2.call(testTable) - - load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('test_json_load_case8_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '', 'id > 50', 'true', 'multi_line_json.json') sql "sync" @@ -421,18 +333,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case9', '', 'true', 'json', 'id= id * 10', '', - '$.item', '', 'true', 'nest_json.json') - - sql "sync" - qt_select9 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case9_2', '', 'true', 'json', 'id= id * 10', '', + load_json_data.call('test_json_load_case9_2', '', 'true', 'json', 'id= id * 10', '', '$.item', '', 'true', 'nest_json.json') sql "sync" @@ -448,19 +349,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case10', '', 'true', 'json', 'id= id * 10', '', - '$.item', '', 'true', 'invalid_json.json', true) - - sql "sync" - qt_select10 "select * from ${testTable} order by id" - - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case10_2', '', 'true', 'json', 'id= id * 10', '', + load_json_data.call('test_json_load_case10_2', '', 'true', 'json', 'id= id * 10', '', '$.item', '', 'true', 'invalid_json.json', true) sql "sync" @@ -476,17 +365,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case11', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json') - - sql "sync" - qt_select11 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case11_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json') + load_json_data.call('test_json_load_case11_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json') sql "sync" qt_select11 "select * from ${testTable} order by id" @@ -501,17 +380,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case12', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json') - - sql "sync" - qt_select12 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case12_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json') + load_json_data.call('test_json_load_case12_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json') sql "sync" qt_select12 "select * from ${testTable} order by id" @@ -553,38 +422,6 @@ suite("test_json_load", "p0") { sql "sync" qt_select13 "select * from ${testTable} order by id" - - sql "DROP TABLE IF EXISTS ${testTable}" - create_test_table3.call(testTable) - // load the json data - streamLoad { - table "${testTable}" - - // set http request header params - set 'strip_outer_array', "true" - set 'format', "json" - set 'max_filter_ratio', '1' - file "simple_json2_lack_one_column.json" // import json file - time 10000 // limit inflight 10s - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows) - assertEquals(json.NumberFilteredRows, 4) - assertEquals(json.NumberLoadedRows, 6) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) - } - } - sql "sync" - qt_select13 "select * from ${testTable} order by id" - } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } @@ -595,18 +432,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case14', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', - '$.item', '', 'true', 'nest_json.json') - - sql "sync" - qt_select14 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case14_2', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('test_json_load_case14_2', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '$.item', '', 'true', 'nest_json.json') sql "sync" @@ -622,18 +448,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case15', '', 'true', 'json', 'id, code, city, id= id * 10', - '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', '', 'true', 'nest_json.json') - - sql "sync" - qt_select15 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case15_2', '', 'true', 'json', 'id, code, city,id= id * 10', + load_json_data.call('test_json_load_case15_2', '', 'true', 'json', 'id, code, city,id= id * 10', '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', '', 'true', 'nest_json.json') sql "sync" @@ -649,18 +464,7 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('false', 'test_json_load_case16', 'true', '', 'json', 'id, code, city', - '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', '', 'true', 'nest_json_array.json') - - sql "sync" - qt_select16 "select * from ${testTable} order by id" - - // test new json reader - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table1.call(testTable) - - load_json_data.call('true', 'test_json_load_case16_2', 'true', '', 'json', 'id, code, city', + load_json_data.call('test_json_load_case16_2', 'true', '', 'json', 'id, code, city', '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', '', 'true', 'nest_json_array.json') sql "sync" @@ -714,7 +518,7 @@ suite("test_json_load", "p0") { sql "DROP TABLE IF EXISTS ${testTable}" test_invalid_json_array_table.call(testTable) - load_json_data.call('false', 'test_json_load_case19', 'true', '', 'json', '', '', + load_json_data.call('test_json_load_case19', 'true', '', 'json', '', '', '', '', '', 'invalid_json_array.json', true) sql "sync" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org