This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 1715bae26fd [opt](parquet-writer) Specify the row group size when writing data to Parquet files. (#35081) (#36042) 1715bae26fd is described below commit 1715bae26fd8f6c24cf929a15f801bd05ac56898 Author: Mingyu Chen <morning...@163.com> AuthorDate: Fri Jun 7 17:57:11 2024 +0800 [opt](parquet-writer) Specify the row group size when writing data to Parquet files. (#35081) (#36042) bp #35081 Co-authored-by: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> --- be/src/vec/runtime/vparquet_transformer.cpp | 12 +++++++----- be/src/vec/runtime/vparquet_transformer.h | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 8069487c009..77068adeebe 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -70,6 +70,8 @@ namespace doris::vectorized { +const uint64_t min_row_group_size = 128 * 1024 * 1024; // 128MB + ParquetOutputStream::ParquetOutputStream(doris::io::FileWriter* file_writer) : _file_writer(file_writer), _cur_pos(0), _written_len(0) { set_mode(arrow::io::FileMode::WRITE); @@ -292,12 +294,12 @@ Status VParquetTransformer::write(const Block& block) { RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), &result, _state->timezone_obj())); - auto get_table_res = arrow::Table::FromRecordBatches(result->schema(), {result}); - if (!get_table_res.ok()) { - return Status::InternalError("Error when get arrow table from record batchs"); + RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result)); + _write_size += block.bytes(); + if (_write_size >= min_row_group_size) { + RETURN_DORIS_STATUS_IF_ERROR(_writer->NewBufferedRowGroup()); + _write_size = 0; } - auto& table = get_table_res.ValueOrDie(); - RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteTable(*table, block.rows())); return Status::OK(); } diff --git a/be/src/vec/runtime/vparquet_transformer.h b/be/src/vec/runtime/vparquet_transformer.h index ff1fa73e094..9eae25d8ac4 100644 --- a/be/src/vec/runtime/vparquet_transformer.h +++ b/be/src/vec/runtime/vparquet_transformer.h @@ -130,6 +130,7 @@ private: const bool _parquet_disable_dictionary; const TParquetVersion::type _parquet_version; const std::string* _iceberg_schema_json; + uint64_t _write_size = 0; }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org