This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1715bae26fd [opt](parquet-writer) Specify the row group size when 
writing data to Parquet files. (#35081) (#36042)
1715bae26fd is described below

commit 1715bae26fd8f6c24cf929a15f801bd05ac56898
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Fri Jun 7 17:57:11 2024 +0800

    [opt](parquet-writer) Specify the row group size when writing data to 
Parquet files. (#35081) (#36042)
    
    bp #35081
    
    Co-authored-by: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
---
 be/src/vec/runtime/vparquet_transformer.cpp | 12 +++++++-----
 be/src/vec/runtime/vparquet_transformer.h   |  1 +
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index 8069487c009..77068adeebe 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -70,6 +70,8 @@
 
 namespace doris::vectorized {
 
+const uint64_t min_row_group_size = 128 * 1024 * 1024; // 128MB
+
 ParquetOutputStream::ParquetOutputStream(doris::io::FileWriter* file_writer)
         : _file_writer(file_writer), _cur_pos(0), _written_len(0) {
     set_mode(arrow::io::FileMode::WRITE);
@@ -292,12 +294,12 @@ Status VParquetTransformer::write(const Block& block) {
     RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, 
arrow::default_memory_pool(),
                                            &result, _state->timezone_obj()));
 
-    auto get_table_res = arrow::Table::FromRecordBatches(result->schema(), 
{result});
-    if (!get_table_res.ok()) {
-        return Status::InternalError("Error when get arrow table from record 
batchs");
+    RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result));
+    _write_size += block.bytes();
+    if (_write_size >= min_row_group_size) {
+        RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup());
+        _write_size = 0;
     }
-    auto& table = get_table_res.ValueOrDie();
-    RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteTable(*table, block.rows()));
     return Status::OK();
 }
 
diff --git a/be/src/vec/runtime/vparquet_transformer.h 
b/be/src/vec/runtime/vparquet_transformer.h
index ff1fa73e094..9eae25d8ac4 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -130,6 +130,7 @@ private:
     const bool _parquet_disable_dictionary;
     const TParquetVersion::type _parquet_version;
     const std::string* _iceberg_schema_json;
+    uint64_t _write_size = 0;
 };
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to