This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b8f93681eb [feature-wip](file reader) Merge broker reader to the new file reader (#14980) b8f93681eb is described below commit b8f93681eb07488263122f8ca408a0226193cdf2 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Wed Dec 14 12:48:02 2022 +0800 [feature-wip](file reader) Merge broker reader to the new file reader (#14980) Currently, there are two sets of file readers in Doris, this pr rewrites the old broker reader with the new file reader. TODO: 1. rewrite stream load pipe and kafka consumer pipe --- be/src/io/CMakeLists.txt | 2 + be/src/io/fs/broker_file_reader.cpp | 129 +++++++++ be/src/io/fs/broker_file_reader.h | 57 ++++ be/src/io/fs/broker_file_system.cpp | 308 +++++++++++++++++++++ be/src/io/fs/broker_file_system.h | 74 +++++ be/src/io/fs/file_system.h | 1 + be/src/io/fs/hdfs_file_reader.cpp | 2 +- be/src/io/fs/hdfs_file_system.cpp | 9 +- be/src/util/doris_metrics.cpp | 4 + be/src/util/doris_metrics.h | 2 + .../exec/format/file_reader/new_file_factory.cpp | 18 ++ .../vec/exec/format/file_reader/new_file_factory.h | 5 + 12 files changed, 608 insertions(+), 3 deletions(-) diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 402a90a800..e3125e946a 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -43,6 +43,8 @@ set(IO_FILES fs/s3_file_writer.cpp fs/hdfs_file_system.cpp fs/hdfs_file_reader.cpp + fs/broker_file_system.cpp + fs/broker_file_reader.cpp cache/dummy_file_cache.cpp cache/file_cache.cpp cache/file_cache_manager.cpp diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp new file mode 100644 index 0000000000..e8e54cb736 --- /dev/null +++ b/be/src/io/fs/broker_file_reader.cpp @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/broker_file_reader.h" + +#include <gen_cpp/TPaloBrokerService.h> + +#include "common/status.h" +#include "io/fs/broker_file_system.h" +#include "util/doris_metrics.h" + +namespace doris { +namespace io { + +BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, + size_t file_size, TBrokerFD fd, BrokerFileSystem* fs) + : _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), _fs(fs) { + DorisMetrics::instance()->broker_file_open_reading->increment(1); + DorisMetrics::instance()->broker_file_reader_total->increment(1); +} + +BrokerFileReader::~BrokerFileReader() { + close(); +} + +Status BrokerFileReader::close() { + bool expected = false; + if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + TBrokerCloseReaderRequest request; + request.__set_version(TBrokerVersion::VERSION_ONE); + request.__set_fd(_fd); + + TBrokerOperationStatus response; + try { + std::shared_ptr<BrokerServiceConnection> client; + RETURN_IF_ERROR(_fs->get_client(&client)); + try { + (*client)->closeReader(response, request); + } catch (apache::thrift::transport::TTransportException& e) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + RETURN_IF_ERROR((*client).reopen()); + (*client)->closeReader(response, request); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "Close broker reader failed, broker:" << _broker_addr << " failed:" << e.what(); + return Status::RpcError(ss.str()); + } + + if (response.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "close broker reader failed, broker:" << _broker_addr + << " failed:" << response.message; + return Status::InternalError(ss.str()); + } + + DorisMetrics::instance()->broker_file_open_reading->increment(-1); + } + return Status::OK(); +} + +Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/, + size_t* bytes_read) { + DCHECK(!closed()); + size_t bytes_req = result.size; + char* to = result.data; + *bytes_read = 0; + if (UNLIKELY(bytes_req == 0)) { + return Status::OK(); + } + + TBrokerPReadRequest request; + request.__set_version(TBrokerVersion::VERSION_ONE); + request.__set_fd(_fd); + request.__set_offset(offset); + request.__set_length(bytes_req); + + TBrokerReadResponse response; + std::shared_ptr<BrokerServiceConnection> client; + RETURN_IF_ERROR(_fs->get_client(&client)); + try { + VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset + << ", read bytes length:" << bytes_req; + try { + (*client)->pread(response, request); + } catch (apache::thrift::transport::TTransportException& e) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + RETURN_IF_ERROR((*client).reopen()); + LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what(); + (*client)->pread(response, request); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "Open broker reader failed, broker:" << _broker_addr << " failed:" << e.what(); + return Status::RpcError(ss.str()); + } + + if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) { + // read the end of broker's file + *bytes_read = 0; + return Status::OK(); + } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "Open broker reader failed, broker:" << _broker_addr + << " failed:" << response.opStatus.message; + return Status::InternalError(ss.str()); + } + + *bytes_read = response.data.size(); + memcpy(to, response.data.data(), *bytes_read); + return Status::OK(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h new file mode 100644 index 0000000000..e0565dc656 --- /dev/null +++ b/be/src/io/fs/broker_file_reader.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PaloBrokerService_types.h> + +#include "io/fs/file_reader.h" +namespace doris { +namespace io { + +class BrokerFileSystem; + +class BrokerFileReader : public FileReader { +public: + BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size, + TBrokerFD fd, BrokerFileSystem* fs); + + ~BrokerFileReader() override; + + Status close() override; + + Status read_at(size_t offset, Slice result, const IOContext& io_ctx, + size_t* bytes_read) override; + + const Path& path() const override { return _path; } + + size_t size() const override { return _file_size; } + + bool closed() const override { return _closed.load(std::memory_order_acquire); } + +private: + const Path& _path; + size_t _file_size; + + const TNetworkAddress& _broker_addr; + TBrokerFD _fd; + + BrokerFileSystem* _fs; + std::atomic<bool> _closed = false; +}; +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp new file mode 100644 index 0000000000..e68edb6253 --- /dev/null +++ b/be/src/io/fs/broker_file_system.cpp @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/broker_file_system.h" + +#include <gen_cpp/PaloBrokerService_types.h> +#include <gen_cpp/TPaloBrokerService.h> + +#include "io/fs/broker_file_reader.h" +#include "runtime/broker_mgr.h" +#include "runtime/exec_env.h" +#include "util/defer_op.h" +#include "util/storage_backend.h" + +namespace doris { +namespace io { + +#ifdef BE_TEST +inline BrokerServiceClientCache* client_cache() { + static BrokerServiceClientCache s_client_cache; + return &s_client_cache; +} + +inline const std::string& client_id(const TNetworkAddress& addr) { + static std::string s_client_id = "doris_unit_test"; + return s_client_id; +} +#else +inline BrokerServiceClientCache* client_cache() { + return ExecEnv::GetInstance()->broker_client_cache(); +} + +inline const std::string& client_id(const TNetworkAddress& addr) { + return ExecEnv::GetInstance()->broker_mgr()->get_client_id(addr); +} +#endif + +#ifndef CHECK_BROKER_CLIENT +#define CHECK_BROKER_CLIENT(client) \ + if (!client) { \ + return Status::InternalError("init Broker client error"); \ + } +#endif + +BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr, + const std::map<std::string, std::string>& broker_prop) + : RemoteFileSystem("", "", FileSystemType::BROKER), + _broker_addr(broker_addr), + _broker_prop(broker_prop) {} + +Status BrokerFileSystem::connect() { + Status status = Status::OK(); + _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr, + config::thrift_rpc_timeout_ms, &status)); + if (!status.ok()) { + std::stringstream ss; + ss << "failed to get broker client. " + << "broker addr: " << _broker_addr << ". msg: " << status.get_error_msg(); + status = Status::InternalError(ss.str()); + } + return status; +} + +Status BrokerFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { + CHECK_BROKER_CLIENT(_client); + TBrokerOpenReaderRequest request; + request.__set_version(TBrokerVersion::VERSION_ONE); + request.__set_path(path); + request.__set_startOffset(0); + request.__set_clientId(client_id(_broker_addr)); + request.__set_properties(_broker_prop); + + TBrokerOpenReaderResponse* response = new TBrokerOpenReaderResponse(); + Defer del_reponse {[&] { delete response; }}; + try { + Status status; + try { + (*_client)->openReader(*response, request); + } catch (apache::thrift::transport::TTransportException& e) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->openReader(*response, request); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "Open broker reader failed, broker:" << _broker_addr << " failed: " << e.what(); + return Status::RpcError(ss.str()); + } + + if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "Open broker reader failed, broker: " << _broker_addr + << " failed: " << response->opStatus.message; + return Status::InternalError(ss.str()); + } + // TODO(cmy): The file size is no longer got from openReader() method. + // But leave the code here for compatibility. + // This will be removed later. + size_t file_size = 0; + TBrokerFD fd; + if (response->__isset.size) { + file_size = response->size; + } + fd = response->fd; + *reader = std::make_shared<BrokerFileReader>(_broker_addr, path, file_size, fd, this); + return Status::OK(); +} + +Status BrokerFileSystem::delete_file(const Path& path) { + CHECK_BROKER_CLIENT(_client); + try { + // rm file from remote path + TBrokerDeletePathRequest del_req; + TBrokerOperationStatus del_rep; + del_req.__set_version(TBrokerVersion::VERSION_ONE); + del_req.__set_path(path); + del_req.__set_properties(_broker_prop); + + try { + (*_client)->deletePath(del_rep, del_req); + } catch (apache::thrift::transport::TTransportException& e) { + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->deletePath(del_rep, del_req); + } + + if (del_rep.statusCode == TBrokerOperationStatusCode::OK) { + return Status::OK(); + } else { + std::stringstream ss; + ss << "failed to delete from remote path: " << path << ", msg: " << del_rep.message; + return Status::InternalError(ss.str()); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "failed to delete file in remote path: " << path << ", msg: " << e.what(); + return Status::RpcError(ss.str()); + } +} + +Status BrokerFileSystem::create_directory(const Path& /*path*/) { + return Status::NotSupported("create directory not implemented!"); +} + +// Delete all files under path. +Status BrokerFileSystem::delete_directory(const Path& path) { + return delete_file(path); +} + +Status BrokerFileSystem::exists(const Path& path, bool* res) const { + CHECK_BROKER_CLIENT(_client); + *res = false; + try { + TBrokerCheckPathExistRequest check_req; + TBrokerCheckPathExistResponse check_rep; + check_req.__set_version(TBrokerVersion::VERSION_ONE); + check_req.__set_path(path); + check_req.__set_properties(_broker_prop); + + try { + (*_client)->checkPathExist(check_rep, check_req); + } catch (apache::thrift::transport::TTransportException& e) { + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->checkPathExist(check_rep, check_req); + } + + if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "failed to check exist: " << path << ", msg: " << check_rep.opStatus.message; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } else if (!check_rep.isPathExist) { + return Status::NotFound("{} not exists!", path.string()); + } else { + *res = true; + return Status::OK(); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "failed to check exist: " << path << ", msg: " << e.what(); + return Status::RpcError(ss.str()); + } +} + +Status BrokerFileSystem::file_size(const Path& path, size_t* file_size) const { + CHECK_BROKER_CLIENT(_client); + TBrokerOpenReaderRequest request; + request.__set_version(TBrokerVersion::VERSION_ONE); + request.__set_path(path); + request.__set_startOffset(0); + request.__set_clientId(client_id(_broker_addr)); + request.__set_properties(_broker_prop); + + TBrokerOpenReaderResponse* response = new TBrokerOpenReaderResponse(); + Defer del_reponse {[&] { delete response; }}; + try { + Status status; + try { + (*_client)->openReader(*response, request); + } catch (apache::thrift::transport::TTransportException& e) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->openReader(*response, request); + } + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "Open broker reader failed, broker: " << _broker_addr << " failed: " << e.what(); + return Status::RpcError(ss.str()); + } + + if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "Open broker reader failed, broker: " << _broker_addr + << " failed: " << response->opStatus.message; + return Status::RpcError(ss.str()); + } + // TODO(cmy): The file size is no longer got from openReader() method. + // But leave the code here for compatibility. + // This will be removed later. + if (response->__isset.size) { + *file_size = response->size; + } + return Status::OK(); +} + +Status BrokerFileSystem::list(const Path& path, std::vector<Path>* files) { + CHECK_BROKER_CLIENT(_client); + Status status = Status::OK(); + try { + // get existing files from remote path + TBrokerListResponse list_rep; + TBrokerListPathRequest list_req; + list_req.__set_version(TBrokerVersion::VERSION_ONE); + list_req.__set_path(path / "*"); + list_req.__set_isRecursive(false); + list_req.__set_properties(_broker_prop); + list_req.__set_fileNameOnly(true); // we only need file name, not abs path + + try { + (*_client)->listPath(list_rep, list_req); + } catch (apache::thrift::transport::TTransportException& e) { + RETURN_IF_ERROR((*_client).reopen()); + (*_client)->listPath(list_rep, list_req); + } + + if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) { + LOG(INFO) << "path does not exist: " << path; + return Status::OK(); + } else if (list_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) { + std::stringstream ss; + ss << "failed to list files from remote path: " << path + << ", msg: " << list_rep.opStatus.message; + return Status::InternalError(ss.str()); + } + LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size(); + + // split file name and checksum + for (const auto& file : list_rep.files) { + if (file.isDir) { + // this is not a file + continue; + } + + const std::string& file_name = file.path; + size_t pos = file_name.find_last_of('.'); + if (pos == std::string::npos || pos == file_name.size() - 1) { + // Not found checksum separator, ignore this file + continue; + } + + FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1), + file.size}; + files->emplace_back(std::string(file_name, 0, pos)); + VLOG(2) << "split remote file: " << std::string(file_name, 0, pos) + << ", checksum: " << std::string(file_name, pos + 1); + } + + LOG(INFO) << "finished to split files. valid file num: " << files->size(); + + } catch (apache::thrift::TException& e) { + std::stringstream ss; + ss << "failed to list files in remote path: " << path << ", msg: " << e.what(); + return Status::RpcError(ss.str()); + } + return status; +} + +Status BrokerFileSystem::get_client(std::shared_ptr<BrokerServiceConnection>* client) const { + CHECK_BROKER_CLIENT(_client); + *client = _client; + return Status::OK(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h new file mode 100644 index 0000000000..5b6cce33ba --- /dev/null +++ b/be/src/io/fs/broker_file_system.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "io/fs/remote_file_system.h" +#include "runtime/client_cache.h" +namespace doris { + +namespace io { +class BrokerFileSystem final : public RemoteFileSystem { +public: + BrokerFileSystem(const TNetworkAddress& broker_addr, + const std::map<std::string, std::string>& broker_prop); + ~BrokerFileSystem() override = default; + + Status create_file(const Path& /*path*/, FileWriterPtr* /*writer*/) override { + return Status::NotSupported("Currently not support to create file through broker."); + } + + Status open_file(const Path& path, FileReaderSPtr* reader) override; + + Status delete_file(const Path& path) override; + + Status create_directory(const Path& path) override; + + // Delete all files under path. + Status delete_directory(const Path& path) override; + + Status link_file(const Path& /*src*/, const Path& /*dest*/) override { + return Status::NotSupported("Not supported link file through broker."); + } + + Status exists(const Path& path, bool* res) const override; + + Status file_size(const Path& path, size_t* file_size) const override; + + Status list(const Path& path, std::vector<Path>* files) override; + + Status upload(const Path& /*local_path*/, const Path& /*dest_path*/) override { + return Status::NotSupported("Currently not support to upload file to HDFS"); + } + + Status batch_upload(const std::vector<Path>& /*local_paths*/, + const std::vector<Path>& /*dest_paths*/) override { + return Status::NotSupported("Currently not support to batch upload file to HDFS"); + } + + Status connect() override; + + Status get_client(std::shared_ptr<BrokerServiceConnection>* client) const; + +private: + const TNetworkAddress& _broker_addr; + const std::map<std::string, std::string>& _broker_prop; + + std::shared_ptr<BrokerServiceConnection> _client; +}; +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index d97148f4c5..2dde2a64e7 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -38,6 +38,7 @@ enum class FileSystemType : uint8_t { LOCAL, S3, HDFS, + BROKER, }; class FileSystem { diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 1b77e64238..ef03541387 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -64,8 +64,8 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*i size_t bytes_req = result.size; char* to = result.data; bytes_req = std::min(bytes_req, _file_size - offset); + *bytes_read = 0; if (UNLIKELY(bytes_req == 0)) { - *bytes_read = 0; return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index d219e0393a..cafa7ca34c 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -96,7 +96,7 @@ Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer // } // hdfsCloseFile(handle->hdfs_fs, hdfs_file); // return Status::OK(); - return Status::NotSupported("Currently not support to upload file to HDFS"); + return Status::NotSupported("Currently not support to create file to HDFS"); } Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { @@ -161,7 +161,12 @@ Status HdfsFileSystem::delete_directory(const Path& path) { Status HdfsFileSystem::exists(const Path& path, bool* res) const { CHECK_HDFS_HANDLE(_fs_handle); - *res = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str()); + int is_exists = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str()); + if (is_exists == 0) { + *res = true; + } else { + *res = false; + } return Status::OK(); } diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index d3bebc7eb0..e5dad525fb 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -171,6 +171,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hdfs_file_reader_total, MetricUnit::FILESYSTEM); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(broker_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM); @@ -183,6 +184,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYS DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hdfs_file_open_reading, MetricUnit::FILESYSTEM); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM); @@ -301,6 +303,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hdfs_file_reader_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, broker_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total); @@ -312,6 +315,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, hdfs_file_open_reading); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, broker_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing); } diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index e59009a99c..a43858d055 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -155,6 +155,7 @@ public: IntCounter* local_file_reader_total; IntCounter* s3_file_reader_total; IntCounter* hdfs_file_reader_total; + IntCounter* broker_file_reader_total; IntCounter* local_file_writer_total; IntCounter* s3_file_writer_total; IntCounter* file_created_total; @@ -166,6 +167,7 @@ public: IntGauge* local_file_open_reading; IntGauge* s3_file_open_reading; IntGauge* hdfs_file_open_reading; + IntGauge* broker_file_open_reading; IntGauge* local_file_open_writing; IntGauge* s3_file_open_writing; diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp b/be/src/vec/exec/format/file_reader/new_file_factory.cpp index 23d6fa4eab..b8f2252885 100644 --- a/be/src/vec/exec/format/file_reader/new_file_factory.cpp +++ b/be/src/vec/exec/format/file_reader/new_file_factory.cpp @@ -20,6 +20,7 @@ #include "io/broker_reader.h" #include "io/broker_writer.h" #include "io/buffered_reader.h" +#include "io/fs/broker_file_system.h" #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/s3_file_system.h" @@ -126,6 +127,12 @@ Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/, &file_system_ptr, file_reader)); break; } + case TFileType::FILE_BROKER: { + RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0], + system_properties.properties, file_description.path, + &file_system_ptr, file_reader)); + break; + } default: return Status::NotSupported("unsupported file reader type: {}", std::to_string(type)); } @@ -179,4 +186,15 @@ Status NewFileFactory::create_s3_reader(const std::map<std::string, std::string> (*s3_file_system)->open_file(s3_uri.get_key(), reader); return Status::OK(); } + +Status NewFileFactory::create_broker_reader(const TNetworkAddress& broker_addr, + const std::map<std::string, std::string>& prop, + const std::string& path, + io::FileSystem** broker_file_system, + io::FileReaderSPtr* reader) { + *broker_file_system = new io::BrokerFileSystem(broker_addr, prop); + (dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect(); + (*broker_file_system)->open_file(path, reader); + return Status::OK(); +} } // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h b/be/src/vec/exec/format/file_reader/new_file_factory.h index 82fc1fe6cf..8167618859 100644 --- a/be/src/vec/exec/format/file_reader/new_file_factory.h +++ b/be/src/vec/exec/format/file_reader/new_file_factory.h @@ -92,6 +92,11 @@ public: const std::string& path, io::FileSystem** s3_file_system, io::FileReaderSPtr* reader); + static Status create_broker_reader(const TNetworkAddress& broker_addr, + const std::map<std::string, std::string>& prop, + const std::string& path, io::FileSystem** hdfs_file_system, + io::FileReaderSPtr* reader); + static TFileType::type convert_storage_type(TStorageBackendType::type type) { switch (type) { case TStorageBackendType::LOCAL: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org