github-actions[bot] commented on code in PR #33796:
URL: https://github.com/apache/doris/pull/33796#discussion_r1568784661


##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -51,32 +72,150 @@
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_write_into_batch());
+        if (_batch_buffer._write_file_cache) {
+            _write_into_local_file_cache();
+        }
+        _batch_buffer.clear();
+    }
+    int ret;
     if (_sync_file_data) {
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
+    if (ret == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. "
+           << "fs_name:" << _fs_name << " path:" << _path << ", err: " << 
hdfs_error();
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::CachedBatchBuffer::CachedBatchBuffer(size_t capacity)
+        : _capacity(capacity), _size(0) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::CachedBatchBuffer::full() const {
+    return _size == _capacity;
+}
+
+const char* HdfsFileWriter::CachedBatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::capacity() const {
+    return _capacity;
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::size() const {
+    return _size;
+}
 
+void HdfsFileWriter::CachedBatchBuffer::clear() {
+    _size = 0;
+    _batch_buffer.clear();
+}
+
+FileBlocksHolderPtr 
HdfsFileWriter::CachedBatchBuffer::allocate_cache_holder(size_t offset) {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, _capacity, ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {
+    auto _holder = _batch_buffer.allocate_cache_holder(_bytes_appended - 
_batch_buffer.size());
+    size_t pos = 0;
+    size_t data_remain_size = _batch_buffer.size();
+    for (auto& block : _holder->file_blocks) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t block_size = block->range().size();
+        size_t append_size = std::min(data_remain_size, block_size);
+        if (block->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && block->range().right >= _index_offset) {
+                
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
+            }
+            block->get_or_set_downloader();
+            if (block->is_downloader()) {
+                Slice s(_batch_buffer.data() + pos, append_size);
+                Status st = block->append(s);
+                if (st.ok()) {
+                    st = block->finalize();
+                }
+                if (!st.ok()) {
+                    LOG_WARNING("failed to append data to file 
cache").error(st);
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+void HdfsFileWriter::CachedBatchBuffer::append(Slice& data) {

Review Comment:
   warning: method 'append' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/io/fs/hdfs_file_writer.h:73:
   ```diff
   -         void append(Slice& s);
   +         static void append(Slice& s);
   ```
   



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -51,32 +72,150 @@ HdfsFileWriter::~HdfsFileWriter() {
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_write_into_batch());
+        if (_batch_buffer._write_file_cache) {
+            _write_into_local_file_cache();
+        }
+        _batch_buffer.clear();
+    }
+    int ret;
     if (_sync_file_data) {
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
+    if (ret == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. "
+           << "fs_name:" << _fs_name << " path:" << _path << ", err: " << 
hdfs_error();
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::CachedBatchBuffer::CachedBatchBuffer(size_t capacity)
+        : _capacity(capacity), _size(0) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::CachedBatchBuffer::full() const {
+    return _size == _capacity;
+}
+
+const char* HdfsFileWriter::CachedBatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::capacity() const {
+    return _capacity;
+}
+
+size_t HdfsFileWriter::CachedBatchBuffer::size() const {
+    return _size;
+}
 
+void HdfsFileWriter::CachedBatchBuffer::clear() {
+    _size = 0;
+    _batch_buffer.clear();
+}
+
+FileBlocksHolderPtr 
HdfsFileWriter::CachedBatchBuffer::allocate_cache_holder(size_t offset) {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, _capacity, ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {

Review Comment:
   warning: method '_write_into_local_file_cache' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/io/fs/hdfs_file_writer.h:54:
   ```diff
   -     void _write_into_local_file_cache();
   +     static void _write_into_local_file_cache();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to