gavinchou commented on code in PR #33796:
URL: https://github.com/apache/doris/pull/33796#discussion_r1569149086


##########
be/src/common/config.cpp:
##########
@@ -1028,6 +1028,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880");
 // The timeout config for S3 buffer allocation
 DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
 DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
+DEFINE_mInt64(hdfs_write_batch_buffer_size, "4194304"); // 4MB

Review Comment:
   This value must align to multiple times of 
   `file_cache_each_block_size` (1MB). 
   And it should be checked, otherwise there may be correctness issues.



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -26,23 +26,44 @@
 
 #include "common/logging.h"
 #include "common/status.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/file_cache_common.h"
 #include "io/fs/err_utils.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/hdfs_util.h"
 #include "service/backend_options.h"
+#include "util/bvar_helper.h"
 
 namespace doris::io {
 
+bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer", "total_num");

Review Comment:
   use single word "hdfs_file_writer_total_num" instead



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -51,32 +73,159 @@ HdfsFileWriter::~HdfsFileWriter() {
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_flush_buffer());
+    }
+    int ret;
     if (_sync_file_data) {
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::CachedBatchBuffer::CachedBatchBuffer(size_t capacity)
+        : _is_cold_data(false), _write_file_cache(false), _expiration_time(0) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::CachedBatchBuffer::full() const {
+    return size() == capacity();
+}
+
+const char* HdfsFileWriter::CachedBatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::capacity() const {
+    return _batch_buffer.capacity();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::size() const {
+    return _batch_buffer.size();
+}
+
+void HdfsFileWriter::CachedBatchBuffer::clear() {
+    _batch_buffer.clear();
+}
+
+FileBlocksHolderPtr 
HdfsFileWriter::CachedBatchBuffer::allocate_cache_holder(size_t offset) {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, capacity(), ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {
+    auto _holder = _batch_buffer.allocate_cache_holder(_bytes_appended - 
_batch_buffer.size());

Review Comment:
   `_holder` -> `holder`



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -51,32 +73,159 @@ HdfsFileWriter::~HdfsFileWriter() {
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_flush_buffer());
+    }
+    int ret;
     if (_sync_file_data) {
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::CachedBatchBuffer::CachedBatchBuffer(size_t capacity)
+        : _is_cold_data(false), _write_file_cache(false), _expiration_time(0) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::CachedBatchBuffer::full() const {
+    return size() == capacity();
+}
+
+const char* HdfsFileWriter::CachedBatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::capacity() const {
+    return _batch_buffer.capacity();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::size() const {
+    return _batch_buffer.size();
+}
+
+void HdfsFileWriter::CachedBatchBuffer::clear() {
+    _batch_buffer.clear();
+}
+
+FileBlocksHolderPtr 
HdfsFileWriter::CachedBatchBuffer::allocate_cache_holder(size_t offset) {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, capacity(), ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {
+    auto _holder = _batch_buffer.allocate_cache_holder(_bytes_appended - 
_batch_buffer.size());
+    size_t pos = 0;
+    size_t data_remain_size = _batch_buffer.size();
+    for (auto& block : _holder->file_blocks) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t block_size = block->range().size();
+        size_t append_size = std::min(data_remain_size, block_size);
+        if (block->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && block->range().right >= _index_offset) {
+                
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
+            }
+            block->get_or_set_downloader();
+            if (block->is_downloader()) {
+                Slice s(_batch_buffer.data() + pos, append_size);
+                Status st = block->append(s);
+                if (st.ok()) {
+                    st = block->finalize();
+                }
+                if (!st.ok()) {
+                    LOG_WARNING("failed to append data to file 
cache").error(st);
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+Status HdfsFileWriter::append_hdfs_file(std::string_view content) {
+    while (!content.empty()) {
+        int64_t written_bytes;
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
+            written_bytes =
+                    hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, 
content.data(), content.size());
+        }
+        if (written_bytes < 0) {
+            return Status::InternalError("write hdfs failed. fs_name: {}, 
path: {}, error: {}",
+                                         _fs_name, _path.native(), 
hdfs_error());
+        }
+        hdfs_bytes_written_total << written_bytes;
+        content.remove_prefix(written_bytes);
+    }
+    return Status::OK();
+}
 
+Status HdfsFileWriter::_flush_buffer() {
+    RETURN_IF_ERROR(append_hdfs_file(_batch_buffer._batch_buffer));
+    if (_batch_buffer._write_file_cache) {
+        _write_into_local_file_cache();
+    }
+    _batch_buffer.clear();
+    return Status::OK();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::append(std::string_view content) {
+    size_t append_size = std::min(capacity() - size(), content.size());
+    _batch_buffer.append(content.substr(0, append_size));
+    return append_size;
+}
+
+Status HdfsFileWriter::_append(std::string_view content) {
+    while (!content.empty()) {
+        if (_batch_buffer.full()) {
+            auto error_msg = fmt::format("invalid batch buffer status, 
capacity {}, size {}",
+                                         _batch_buffer.capacity(), 
_batch_buffer.size());
+            DCHECK(false) << error_msg;
+            return Status::InternalError(error_msg);
+        }
+        size_t append_size = _batch_buffer.append(content);
+        content.remove_prefix(append_size);
+        if (_batch_buffer.full()) {
+            RETURN_IF_ERROR(_flush_buffer());

Review Comment:
   Add comment `_flush_buffer` will clear the buffer



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -51,32 +73,159 @@ HdfsFileWriter::~HdfsFileWriter() {
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_flush_buffer());
+    }
+    int ret;
     if (_sync_file_data) {
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::CachedBatchBuffer::CachedBatchBuffer(size_t capacity)
+        : _is_cold_data(false), _write_file_cache(false), _expiration_time(0) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::CachedBatchBuffer::full() const {
+    return size() == capacity();
+}
+
+const char* HdfsFileWriter::CachedBatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::capacity() const {
+    return _batch_buffer.capacity();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::size() const {
+    return _batch_buffer.size();
+}
+
+void HdfsFileWriter::CachedBatchBuffer::clear() {
+    _batch_buffer.clear();
+}
+
+FileBlocksHolderPtr 
HdfsFileWriter::CachedBatchBuffer::allocate_cache_holder(size_t offset) {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, capacity(), ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {
+    auto _holder = _batch_buffer.allocate_cache_holder(_bytes_appended - 
_batch_buffer.size());
+    size_t pos = 0;
+    size_t data_remain_size = _batch_buffer.size();
+    for (auto& block : _holder->file_blocks) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t block_size = block->range().size();
+        size_t append_size = std::min(data_remain_size, block_size);
+        if (block->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && block->range().right >= _index_offset) {
+                
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
+            }
+            block->get_or_set_downloader();
+            if (block->is_downloader()) {
+                Slice s(_batch_buffer.data() + pos, append_size);
+                Status st = block->append(s);
+                if (st.ok()) {
+                    st = block->finalize();
+                }
+                if (!st.ok()) {
+                    LOG_WARNING("failed to append data to file 
cache").error(st);
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+Status HdfsFileWriter::append_hdfs_file(std::string_view content) {
+    while (!content.empty()) {
+        int64_t written_bytes;
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
+            written_bytes =
+                    hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, 
content.data(), content.size());
+        }
+        if (written_bytes < 0) {
+            return Status::InternalError("write hdfs failed. fs_name: {}, 
path: {}, error: {}",
+                                         _fs_name, _path.native(), 
hdfs_error());
+        }
+        hdfs_bytes_written_total << written_bytes;
+        content.remove_prefix(written_bytes);
+    }
+    return Status::OK();
+}
 
+Status HdfsFileWriter::_flush_buffer() {
+    RETURN_IF_ERROR(append_hdfs_file(_batch_buffer._batch_buffer));
+    if (_batch_buffer._write_file_cache) {
+        _write_into_local_file_cache();
+    }
+    _batch_buffer.clear();
+    return Status::OK();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::append(std::string_view content) {
+    size_t append_size = std::min(capacity() - size(), content.size());
+    _batch_buffer.append(content.substr(0, append_size));

Review Comment:
   maybe `buf.append(content.data(), append_size)` is enough.



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -51,32 +73,159 @@ HdfsFileWriter::~HdfsFileWriter() {
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_flush_buffer());
+    }
+    int ret;
     if (_sync_file_data) {
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::CachedBatchBuffer::CachedBatchBuffer(size_t capacity)
+        : _is_cold_data(false), _write_file_cache(false), _expiration_time(0) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::CachedBatchBuffer::full() const {
+    return size() == capacity();
+}
+
+const char* HdfsFileWriter::CachedBatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::capacity() const {
+    return _batch_buffer.capacity();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::size() const {
+    return _batch_buffer.size();
+}
+
+void HdfsFileWriter::CachedBatchBuffer::clear() {
+    _batch_buffer.clear();
+}
+
+FileBlocksHolderPtr 
HdfsFileWriter::CachedBatchBuffer::allocate_cache_holder(size_t offset) {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, capacity(), ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {
+    auto _holder = _batch_buffer.allocate_cache_holder(_bytes_appended - 
_batch_buffer.size());
+    size_t pos = 0;
+    size_t data_remain_size = _batch_buffer.size();
+    for (auto& block : _holder->file_blocks) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t block_size = block->range().size();
+        size_t append_size = std::min(data_remain_size, block_size);
+        if (block->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && block->range().right >= _index_offset) {

Review Comment:
   where is _index_offset initialized and how to init it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to