This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new 22ee53d1df [fix](parquet) fix write error data as parquet format. (#13004) 22ee53d1df is described below commit 22ee53d1df4a06ef1671b672d1233ba629c8888a Author: luozenglin <37725793+luozeng...@users.noreply.github.com> AuthorDate: Tue Sep 27 13:28:53 2022 +0800 [fix](parquet) fix write error data as parquet format. (#13004) Fix incorrect data conversion when writing tiny int and small int data to parquet files in non-vectorized engine. --- be/src/exec/parquet_writer.cpp | 46 +++++++++++++++++++++++++++++++----------- be/src/exec/parquet_writer.h | 3 +++ 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp index bf8be94097..8bbe5d326b 100644 --- a/be/src/exec/parquet_writer.cpp +++ b/be/src/exec/parquet_writer.cpp @@ -223,6 +223,17 @@ parquet::RowGroupWriter* ParquetWriterWrapper::get_rg_writer() { return _rg_writer; } +template <typename T> +void ParquetWriterWrapper::write_int32_column(int index, T* item) { + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::Int32Writer* col_writer = static_cast<parquet::Int32Writer*>(rgWriter->column(index)); + int32_t value = 0; + if (item != nullptr) { + value = *item; + } + col_writer->WriteBatch(1, nullptr, nullptr, &value); +} + Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { int num_columns = _output_expr_ctxs.size(); if (num_columns != _str_schema.size()) { @@ -250,8 +261,28 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { } break; } - case TYPE_TINYINT: - case TYPE_SMALLINT: + case TYPE_TINYINT: { + if (_str_schema[index][1] != "int32") { + std::stringstream ss; + ss << "project field type is tiny int, should use int32, but the " + "definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + write_int32_column(index, static_cast<int8_t*>(item)); + break; + } + case TYPE_SMALLINT: { + if (_str_schema[index][1] != "int32") { + std::stringstream ss; + ss << "project field type is small int, should use int32, but the " + "definition type of column " + << _str_schema[index][2] << " is " << _str_schema[index][1]; + return Status::InvalidArgument(ss.str()); + } + write_int32_column(index, static_cast<int16_t*>(item)); + break; + } case TYPE_INT: { if (_str_schema[index][1] != "int32") { std::stringstream ss; @@ -260,16 +291,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) { << _str_schema[index][2] << " is " << _str_schema[index][1]; return Status::InvalidArgument(ss.str()); } - - parquet::RowGroupWriter* rgWriter = get_rg_writer(); - parquet::Int32Writer* col_writer = - static_cast<parquet::Int32Writer*>(rgWriter->column(index)); - if (item != nullptr) { - col_writer->WriteBatch(1, nullptr, nullptr, static_cast<int32_t*>(item)); - } else { - int32_t default_int32 = 0; - col_writer->WriteBatch(1, nullptr, nullptr, &default_int32); - } + write_int32_column(index, static_cast<int32_t*>(item)); break; } case TYPE_BIGINT: { diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h index c076aed941..effaf22757 100644 --- a/be/src/exec/parquet_writer.h +++ b/be/src/exec/parquet_writer.h @@ -93,6 +93,9 @@ public: int64_t written_len(); private: + template <typename T> + void write_int32_column(int index, T* item); + std::shared_ptr<ParquetOutputStream> _outstream; std::shared_ptr<parquet::WriterProperties> _properties; std::shared_ptr<parquet::schema::GroupNode> _schema; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org