github-actions[bot] commented on code in PR #33796: URL: https://github.com/apache/doris/pull/33796#discussion_r1570020808
########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -51,32 +81,150 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); } _closed = true; - + if (_batch_buffer.size() != 0) { + RETURN_IF_ERROR(_flush_buffer()); + } + int ret; if (_sync_file_data) { - int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); + ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + } + if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); } } - // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for - // the HDFS response, but won't guarantee the synchronization of data to HDFS. - int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + } _hdfs_file = nullptr; if (ret != 0) { return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error()); } + hdfs_file_created_total << 1; + return Status::OK(); +} +HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) { + _batch_buffer.reserve(capacity); +} + +bool HdfsFileWriter::BatchBuffer::full() const { + return size() == capacity(); +} + +const char* HdfsFileWriter::BatchBuffer::data() const { + return _batch_buffer.data(); +} + +size_t HdfsFileWriter::BatchBuffer::capacity() const { + return _batch_buffer.capacity(); +} + +size_t HdfsFileWriter::BatchBuffer::size() const { + return _batch_buffer.size(); +} + +void HdfsFileWriter::BatchBuffer::clear() { + _batch_buffer.clear(); +} + +// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code +void HdfsFileWriter::_write_into_local_file_cache() { + auto holder = _cache_builder.allocate_cache_holder(_bytes_appended - _batch_buffer.size(), + _batch_buffer.capacity()); + size_t pos = 0; + size_t data_remain_size = _batch_buffer.size(); + for (auto& block : holder->file_blocks) { + if (data_remain_size == 0) { + break; + } + size_t block_size = block->range().size(); + size_t append_size = std::min(data_remain_size, block_size); + if (block->state() == FileBlock::State::EMPTY) { + if (_index_offset != 0 && block->range().right >= _index_offset) { + static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX)); + } + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(_batch_buffer.data() + pos, append_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG_WARNING("failed to append data to file cache").error(st); + } + } + } + data_remain_size -= append_size; + pos += append_size; + } +} + +Status HdfsFileWriter::append_hdfs_file(std::string_view content) { + while (!content.empty()) { + int64_t written_bytes; + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency); + written_bytes = + hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), content.size()); + } + if (written_bytes < 0) { + return Status::InternalError("write hdfs failed. fs_name: {}, path: {}, error: {}", + _fs_name, _path.native(), hdfs_error()); + } + hdfs_bytes_written_total << written_bytes; + content.remove_prefix(written_bytes); + } + return Status::OK(); +} + +Status HdfsFileWriter::_flush_buffer() { + RETURN_IF_ERROR(append_hdfs_file(_batch_buffer._batch_buffer)); + if (_cache_builder._write_file_cache) { + _write_into_local_file_cache(); + } + _batch_buffer.clear(); + return Status::OK(); +} + +size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) { Review Comment: warning: method 'append' can be made static [readability-convert-member-functions-to-static] ```suggestion static size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org