github-actions[bot] commented on code in PR #14875: URL: https://github.com/apache/doris/pull/14875#discussion_r1041822077
########## be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/line_reader.h" +#include "util/runtime_profile.h" + +namespace doris { +namespace io { +class FileReader; +} + +class Decompressor; +class Status; + +class NewPlainTextLineReader : public LineReader { +public: + NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader, + Decompressor* decompressor, size_t length, + const std::string& line_delimiter, size_t line_delimiter_length, + size_t current_offset); + + ~NewPlainTextLineReader() override; + + Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override; + + void close() override; + +private: + bool update_eof(); + + size_t output_buf_read_remaining() const { return _output_buf_limit - _output_buf_pos; } + + size_t input_buf_read_remaining() const { return _input_buf_limit - _input_buf_pos; } + + bool done() { return _file_eof && output_buf_read_remaining() == 0; } + + // find line delimiter from 'start' to 'start' + len, + // return line delimiter pos if found, otherwise return nullptr. + // TODO: + // save to positions of field separator + uint8_t* update_field_pos_and_find_line_delimiter(const uint8_t* start, size_t len); + + void extend_input_buf(); + void extend_output_buf(); + +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` **be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h:43:** previously declared here ```cpp private: ^ ``` ########## be/src/io/fs/hdfs_file_system.h: ########## @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> +#include <hdfs/hdfs.h> + +#include "common/status.h" +#include "io/fs/remote_file_system.h" +#include "io/hdfs_file_reader.h" +namespace doris { + +namespace io { + +class HdfsFileSystemHandle { +public: + HdfsFileSystemHandle(hdfsFS fs, bool cached) + : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {} + + ~HdfsFileSystemHandle() { + DCHECK(_ref_cnt == 0); + if (hdfs_fs != nullptr) { + // Even if there is an error, the resources associated with the hdfsFS will be freed. + hdfsDisconnect(hdfs_fs); + } + hdfs_fs = nullptr; + } + + int64_t last_access_time() { return _last_access_time; } + + void inc_ref() { + _ref_cnt++; + _last_access_time = _now(); + } + + void dec_ref() { + _ref_cnt--; + _last_access_time = _now(); + } + + int ref_cnt() { return _ref_cnt; } + + bool invalid() { return _invalid; } + + void set_invalid() { _invalid = true; } + + hdfsFS hdfs_fs; + // When cache is full, and all handlers are in use, HdfsFileSystemCache will return an uncached handler. + // Client should delete the handler in such case. + const bool from_cache; + +private: + // the number of referenced client + std::atomic<int> _ref_cnt; + // HdfsFileSystemCache try to remove the oldest handler when the cache is full + std::atomic<uint64_t> _last_access_time; + // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler + std::atomic<bool> _invalid; + + uint64_t _now() { + return std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } +}; + +// Cache for HdfsFileSystemHandle +class HdfsFileSystemCache { +public: + static int MAX_CACHE_HANDLE; + + static HdfsFileSystemCache* instance() { + static HdfsFileSystemCache s_instance; + return &s_instance; + } + + HdfsFileSystemCache(const HdfsFileSystemCache&) = delete; + const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete; + + // This function is thread-safe + Status get_connection(THdfsParams& hdfs_params, HdfsFileSystemHandle** fs_handle); + +private: + std::mutex _lock; + std::unordered_map<uint64, std::unique_ptr<HdfsFileSystemHandle>> _cache; + + HdfsFileSystemCache() = default; + + uint64 _hdfs_hash_code(THdfsParams& hdfs_params); + Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs); + void _clean_invalid(); + void _clean_oldest(); +}; + +class HdfsFileSystem final : public RemoteFileSystem { +public: + HdfsFileSystem(THdfsParams hdfs_params, const std::string& path); + ~HdfsFileSystem() override; + + Status create_file(const Path& path, FileWriterPtr* writer) override; + + Status open_file(const Path& path, FileReaderSPtr* reader) override; + + Status delete_file(const Path& path) override; + + Status create_directory(const Path& path) override; + + // Delete all files under path. + Status delete_directory(const Path& path) override; + + Status link_file(const Path& src, const Path& dest) override { Review Comment: warning: parameter 'src' is unused [misc-unused-parameters] ```suggestion Status link_file(const Path& /*src*/, const Path& dest) override { ``` ########## be/src/vec/exec/format/file_reader/new_file_factory.cpp: ########## @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/file_reader/new_file_factory.h" + +#include "io/broker_reader.h" +#include "io/broker_writer.h" +#include "io/buffered_reader.h" +#include "io/fs/file_system.h" +#include "io/hdfs_reader_writer.h" +#include "io/local_file_reader.h" +#include "io/local_file_writer.h" +#include "io/s3_reader.h" +#include "io/s3_writer.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/load_stream_mgr.h" + +namespace doris { + +Status NewFileFactory::create_file_writer(TFileType::type type, ExecEnv* env, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const std::string& path, int64_t start_offset, + std::unique_ptr<FileWriter>& file_writer) { + switch (type) { + case TFileType::FILE_LOCAL: { + file_writer.reset(new LocalFileWriter(path, start_offset)); + break; + } + case TFileType::FILE_BROKER: { + file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset)); + break; + } + case TFileType::FILE_S3: { + file_writer.reset(new S3Writer(properties, path, start_offset)); + break; + } + case TFileType::FILE_HDFS: { + RETURN_IF_ERROR(HdfsReaderWriter::create_writer( + const_cast<std::map<std::string, std::string>&>(properties), path, file_writer)); + break; + } + default: + return Status::InternalError("unsupported file writer type: {}", std::to_string(type)); + } + + return Status::OK(); +} + +// ============================ +// broker scan node/unique ptr +Status NewFileFactory::create_file_reader(TFileType::type type, ExecEnv* env, + RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + std::unique_ptr<FileReader>& file_reader) { + FileReader* file_reader_ptr; + switch (type) { + case TFileType::FILE_LOCAL: { + file_reader_ptr = new LocalFileReader(range.path, start_offset); + break; + } + case TFileType::FILE_BROKER: { + file_reader_ptr = new BufferedReader( + profile, + new BrokerReader(env, broker_addresses, properties, range.path, start_offset, + range.__isset.file_size ? range.file_size : 0)); + break; + } + case TFileType::FILE_S3: { + file_reader_ptr = + new BufferedReader(profile, new S3Reader(properties, range.path, start_offset)); + break; + } + case TFileType::FILE_HDFS: { + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset, + &hdfs_reader)); + file_reader_ptr = new BufferedReader(profile, hdfs_reader); + break; + } + default: + return Status::InternalError("unsupported file reader type: " + std::to_string(type)); + } + file_reader.reset(file_reader_ptr); + + return Status::OK(); +} + +// ============================ +// file scan node/unique ptr +Status NewFileFactory::create_file_reader(RuntimeProfile* profile, Review Comment: warning: parameter 'profile' is unused [misc-unused-parameters] ```suggestion Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/, ``` ########## be/src/io/fs/hdfs_file_system.h: ########## @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> +#include <hdfs/hdfs.h> + +#include "common/status.h" +#include "io/fs/remote_file_system.h" +#include "io/hdfs_file_reader.h" +namespace doris { + +namespace io { + +class HdfsFileSystemHandle { +public: + HdfsFileSystemHandle(hdfsFS fs, bool cached) + : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {} + + ~HdfsFileSystemHandle() { + DCHECK(_ref_cnt == 0); + if (hdfs_fs != nullptr) { + // Even if there is an error, the resources associated with the hdfsFS will be freed. + hdfsDisconnect(hdfs_fs); + } + hdfs_fs = nullptr; + } + + int64_t last_access_time() { return _last_access_time; } + + void inc_ref() { + _ref_cnt++; + _last_access_time = _now(); + } + + void dec_ref() { + _ref_cnt--; + _last_access_time = _now(); + } + + int ref_cnt() { return _ref_cnt; } + + bool invalid() { return _invalid; } + + void set_invalid() { _invalid = true; } + + hdfsFS hdfs_fs; + // When cache is full, and all handlers are in use, HdfsFileSystemCache will return an uncached handler. + // Client should delete the handler in such case. + const bool from_cache; + +private: + // the number of referenced client + std::atomic<int> _ref_cnt; + // HdfsFileSystemCache try to remove the oldest handler when the cache is full + std::atomic<uint64_t> _last_access_time; + // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler + std::atomic<bool> _invalid; + + uint64_t _now() { + return std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } +}; + +// Cache for HdfsFileSystemHandle +class HdfsFileSystemCache { +public: + static int MAX_CACHE_HANDLE; + + static HdfsFileSystemCache* instance() { + static HdfsFileSystemCache s_instance; + return &s_instance; + } + + HdfsFileSystemCache(const HdfsFileSystemCache&) = delete; + const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete; + + // This function is thread-safe + Status get_connection(THdfsParams& hdfs_params, HdfsFileSystemHandle** fs_handle); + +private: + std::mutex _lock; + std::unordered_map<uint64, std::unique_ptr<HdfsFileSystemHandle>> _cache; + + HdfsFileSystemCache() = default; + + uint64 _hdfs_hash_code(THdfsParams& hdfs_params); + Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs); + void _clean_invalid(); + void _clean_oldest(); +}; + +class HdfsFileSystem final : public RemoteFileSystem { +public: + HdfsFileSystem(THdfsParams hdfs_params, const std::string& path); + ~HdfsFileSystem() override; + + Status create_file(const Path& path, FileWriterPtr* writer) override; + + Status open_file(const Path& path, FileReaderSPtr* reader) override; + + Status delete_file(const Path& path) override; + + Status create_directory(const Path& path) override; + + // Delete all files under path. + Status delete_directory(const Path& path) override; + + Status link_file(const Path& src, const Path& dest) override { Review Comment: warning: parameter 'dest' is unused [misc-unused-parameters] ```suggestion Status link_file(const Path& src, const Path& /*dest*/) override { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org