platoneko commented on code in PR #33796: URL: https://github.com/apache/doris/pull/33796#discussion_r1568707690
########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -139,7 +249,11 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handle } } // open file - auto* hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0); + struct hdfsFile_internal* hdfs_file = nullptr; Review Comment: ```suggestion hdfsFile hdfs_file = nullptr; ``` ########## be/src/io/fs/hdfs_file_writer.h: ########## @@ -48,13 +50,23 @@ class HdfsFileWriter final : public FileWriter { bool closed() const override { return _closed; } private: + Status _write_into_batch(Slice data); + void _write_into_local_file_cache(); + Path _path; HdfsHandler* _hdfs_handler = nullptr; hdfsFile _hdfs_file = nullptr; std::string _fs_name; size_t _bytes_appended = 0; bool _closed = false; bool _sync_file_data; + uint64_t _expiration_time; Review Comment: Consider capsule these variables in an internal class ```c++ class Buffer { public: Buffer(size_t capacity); bool empty(); // Return the number of bytes written size_t append(void* data, size_t size); bool full(); size_t size(); size_t capacity(); private: ... }; ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -128,7 +234,11 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handle if (exists != 0) { // FIXME(plat1ko): Directly return error here? VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir; - int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str()); + int ret; Review Comment: 实测 HDFS 会自动创建上级目录,这些操作是多此一举,可以删了 ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +72,129 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; - + int ret; if (_sync_file_data) { - int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); + ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); } } - // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for - // the HDFS response, but won't guarantee the synchronization of data to HDFS. - int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret == -1) { + std::stringstream ss; + ss << "failed to flush hdfs file. " + << "fs_name:" << _fs_name << " path:" << _path << ", err: " << hdfs_error(); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); + ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + } _hdfs_file = nullptr; if (ret != 0) { return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error()); } + hdfs_file_created_total << 1; + return Status::OK(); +} + +static FileBlocksHolderPtr allocate_cache_holder(BlockFileCache* cache, UInt128Wrapper cache_hash, + size_t offset, uint64_t expiration_time, + bool is_cold) { + CacheContext ctx; + ctx.cache_type = expiration_time == 0 ? FileCacheType::NORMAL : FileCacheType::TTL; + ctx.expiration_time = expiration_time; + ctx.is_cold_data = is_cold; + auto holder = cache->get_or_set(cache_hash, offset, config::hdfs_write_batch_buffer_size, ctx); + return std::make_unique<FileBlocksHolder>(std::move(holder)); +} +// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code +void HdfsFileWriter::_write_into_local_file_cache() { + auto _holder = + allocate_cache_holder(_cache, _cache_hash, _bytes_appended - _batch_buffer.size(), + _expiration_time, _is_cold_data); + size_t pos = 0; + size_t data_remain_size = _batch_buffer.size(); + for (auto& block : _holder->file_blocks) { + if (data_remain_size == 0) { + break; + } + size_t block_size = block->range().size(); + size_t append_size = std::min(data_remain_size, block_size); + if (block->state() == FileBlock::State::EMPTY) { + if (_index_offset != 0 && block->range().right >= _index_offset) { + static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX)); + } + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(_batch_buffer.data() + pos, append_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG_WARNING("failed to append data to file cache").error(st); + } + } + } + data_remain_size -= append_size; + pos += append_size; + } + _batch_buffer.clear(); +} + +Status HdfsFileWriter::_write_into_batch(Slice data) { + while (!data.empty()) { + size_t append_size = std::min(config::hdfs_write_batch_buffer_size - _batch_buffer.size(), + data.get_size()); + std::string_view sv(data.get_data(), append_size); + _batch_buffer.append(sv); + data.remove_prefix(append_size); + if (_batch_buffer.size() == config::hdfs_write_batch_buffer_size) { + size_t left_bytes = config::hdfs_write_batch_buffer_size; + const char* p = _batch_buffer.data(); + while (left_bytes > 0) { + int64_t written_bytes; + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency); + written_bytes = hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, p, left_bytes); + } + if (written_bytes < 0) { + return Status::InternalError( + "write hdfs failed. fs_name: {}, path: {}, error: {}", _fs_name, + _path.native(), hdfs_error()); + } + hdfs_bytes_written_total << written_bytes; + left_bytes -= written_bytes; + p += written_bytes; + _bytes_appended += written_bytes; + } Review Comment: ```suggestion RETURN_IF_ERROR(append_hdfs_file(...)); ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +72,129 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; Review Comment: ``` if (...) { // buffer not empty // Write buffer to HDFS and file cache } ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +72,129 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; Review Comment: Also do this in `finalize` before call `hdfsFlush` ########## be/src/common/config.cpp: ########## @@ -1028,6 +1028,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880"); // The timeout config for S3 buffer allocation DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); +DEFINE_mInt64(hdfs_write_batch_buffer_size, "4194304"); Review Comment: ```suggestion DEFINE_mInt64(hdfs_write_batch_buffer_size, "4194304"); // 4MB ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +72,129 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; - + int ret; if (_sync_file_data) { - int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); + ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); } } - // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for - // the HDFS response, but won't guarantee the synchronization of data to HDFS. - int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret == -1) { + std::stringstream ss; + ss << "failed to flush hdfs file. " + << "fs_name:" << _fs_name << " path:" << _path << ", err: " << hdfs_error(); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); + ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + } _hdfs_file = nullptr; if (ret != 0) { return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error()); } + hdfs_file_created_total << 1; + return Status::OK(); +} + +static FileBlocksHolderPtr allocate_cache_holder(BlockFileCache* cache, UInt128Wrapper cache_hash, + size_t offset, uint64_t expiration_time, + bool is_cold) { + CacheContext ctx; + ctx.cache_type = expiration_time == 0 ? FileCacheType::NORMAL : FileCacheType::TTL; + ctx.expiration_time = expiration_time; + ctx.is_cold_data = is_cold; + auto holder = cache->get_or_set(cache_hash, offset, config::hdfs_write_batch_buffer_size, ctx); + return std::make_unique<FileBlocksHolder>(std::move(holder)); +} +// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code +void HdfsFileWriter::_write_into_local_file_cache() { + auto _holder = + allocate_cache_holder(_cache, _cache_hash, _bytes_appended - _batch_buffer.size(), + _expiration_time, _is_cold_data); + size_t pos = 0; + size_t data_remain_size = _batch_buffer.size(); + for (auto& block : _holder->file_blocks) { + if (data_remain_size == 0) { + break; + } + size_t block_size = block->range().size(); + size_t append_size = std::min(data_remain_size, block_size); + if (block->state() == FileBlock::State::EMPTY) { + if (_index_offset != 0 && block->range().right >= _index_offset) { + static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX)); + } + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(_batch_buffer.data() + pos, append_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG_WARNING("failed to append data to file cache").error(st); + } + } + } + data_remain_size -= append_size; + pos += append_size; + } + _batch_buffer.clear(); +} + +Status HdfsFileWriter::_write_into_batch(Slice data) { + while (!data.empty()) { + size_t append_size = std::min(config::hdfs_write_batch_buffer_size - _batch_buffer.size(), + data.get_size()); + std::string_view sv(data.get_data(), append_size); + _batch_buffer.append(sv); + data.remove_prefix(append_size); + if (_batch_buffer.size() == config::hdfs_write_batch_buffer_size) { Review Comment: ```suggestion if (_buffer.full()) { ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +72,129 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; - + int ret; if (_sync_file_data) { - int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); + ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); } } - // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for - // the HDFS response, but won't guarantee the synchronization of data to HDFS. - int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret == -1) { + std::stringstream ss; + ss << "failed to flush hdfs file. " + << "fs_name:" << _fs_name << " path:" << _path << ", err: " << hdfs_error(); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } Review Comment: No need to call `hdfsFlush` here, since `hdfsCloseFile` will issue `HFlush`. ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +72,129 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; - + int ret; if (_sync_file_data) { - int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); + ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); } } - // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for - // the HDFS response, but won't guarantee the synchronization of data to HDFS. - int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret == -1) { + std::stringstream ss; + ss << "failed to flush hdfs file. " + << "fs_name:" << _fs_name << " path:" << _path << ", err: " << hdfs_error(); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); + ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + } _hdfs_file = nullptr; if (ret != 0) { return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error()); } + hdfs_file_created_total << 1; + return Status::OK(); +} + +static FileBlocksHolderPtr allocate_cache_holder(BlockFileCache* cache, UInt128Wrapper cache_hash, + size_t offset, uint64_t expiration_time, + bool is_cold) { + CacheContext ctx; + ctx.cache_type = expiration_time == 0 ? FileCacheType::NORMAL : FileCacheType::TTL; + ctx.expiration_time = expiration_time; + ctx.is_cold_data = is_cold; + auto holder = cache->get_or_set(cache_hash, offset, config::hdfs_write_batch_buffer_size, ctx); + return std::make_unique<FileBlocksHolder>(std::move(holder)); +} +// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code +void HdfsFileWriter::_write_into_local_file_cache() { + auto _holder = + allocate_cache_holder(_cache, _cache_hash, _bytes_appended - _batch_buffer.size(), + _expiration_time, _is_cold_data); + size_t pos = 0; + size_t data_remain_size = _batch_buffer.size(); + for (auto& block : _holder->file_blocks) { + if (data_remain_size == 0) { + break; + } + size_t block_size = block->range().size(); + size_t append_size = std::min(data_remain_size, block_size); + if (block->state() == FileBlock::State::EMPTY) { + if (_index_offset != 0 && block->range().right >= _index_offset) { + static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX)); + } + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(_batch_buffer.data() + pos, append_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG_WARNING("failed to append data to file cache").error(st); + } + } + } + data_remain_size -= append_size; + pos += append_size; + } + _batch_buffer.clear(); +} + +Status HdfsFileWriter::_write_into_batch(Slice data) { + while (!data.empty()) { + size_t append_size = std::min(config::hdfs_write_batch_buffer_size - _batch_buffer.size(), + data.get_size()); + std::string_view sv(data.get_data(), append_size); + _batch_buffer.append(sv); + data.remove_prefix(append_size); Review Comment: ```suggestion if (_buffer.full()) { // Impossible DCHECK(false) << _buffer.capacity() << ' ' << _buffer.size() << ' ' << data.size(); return Status::InternalError(...); } size_t bytes_written = _buffer.append(data.data, data.size); data.remove_prefix(bytes_written); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org