This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 22ee53d1df [fix](parquet) fix write error data as parquet format. 
(#13004)
22ee53d1df is described below

commit 22ee53d1df4a06ef1671b672d1233ba629c8888a
Author: luozenglin <37725793+luozeng...@users.noreply.github.com>
AuthorDate: Tue Sep 27 13:28:53 2022 +0800

    [fix](parquet) fix write error data as parquet format. (#13004)
    
    Fix incorrect data conversion when writing tiny int and small int data
    to parquet files in non-vectorized engine.
---
 be/src/exec/parquet_writer.cpp | 46 +++++++++++++++++++++++++++++++-----------
 be/src/exec/parquet_writer.h   |  3 +++
 2 files changed, 37 insertions(+), 12 deletions(-)

diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp
index bf8be94097..8bbe5d326b 100644
--- a/be/src/exec/parquet_writer.cpp
+++ b/be/src/exec/parquet_writer.cpp
@@ -223,6 +223,17 @@ parquet::RowGroupWriter* 
ParquetWriterWrapper::get_rg_writer() {
     return _rg_writer;
 }
 
+template <typename T>
+void ParquetWriterWrapper::write_int32_column(int index, T* item) {
+    parquet::RowGroupWriter* rgWriter = get_rg_writer();
+    parquet::Int32Writer* col_writer = 
static_cast<parquet::Int32Writer*>(rgWriter->column(index));
+    int32_t value = 0;
+    if (item != nullptr) {
+        value = *item;
+    }
+    col_writer->WriteBatch(1, nullptr, nullptr, &value);
+}
+
 Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
     int num_columns = _output_expr_ctxs.size();
     if (num_columns != _str_schema.size()) {
@@ -250,8 +261,28 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) 
{
                 }
                 break;
             }
-            case TYPE_TINYINT:
-            case TYPE_SMALLINT:
+            case TYPE_TINYINT: {
+                if (_str_schema[index][1] != "int32") {
+                    std::stringstream ss;
+                    ss << "project field type is tiny int, should use int32, 
but the "
+                          "definition type of column "
+                       << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                    return Status::InvalidArgument(ss.str());
+                }
+                write_int32_column(index, static_cast<int8_t*>(item));
+                break;
+            }
+            case TYPE_SMALLINT: {
+                if (_str_schema[index][1] != "int32") {
+                    std::stringstream ss;
+                    ss << "project field type is small int, should use int32, 
but the "
+                          "definition type of column "
+                       << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                    return Status::InvalidArgument(ss.str());
+                }
+                write_int32_column(index, static_cast<int16_t*>(item));
+                break;
+            }
             case TYPE_INT: {
                 if (_str_schema[index][1] != "int32") {
                     std::stringstream ss;
@@ -260,16 +291,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row) 
{
                        << _str_schema[index][2] << " is " << 
_str_schema[index][1];
                     return Status::InvalidArgument(ss.str());
                 }
-
-                parquet::RowGroupWriter* rgWriter = get_rg_writer();
-                parquet::Int32Writer* col_writer =
-                        
static_cast<parquet::Int32Writer*>(rgWriter->column(index));
-                if (item != nullptr) {
-                    col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<int32_t*>(item));
-                } else {
-                    int32_t default_int32 = 0;
-                    col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int32);
-                }
+                write_int32_column(index, static_cast<int32_t*>(item));
                 break;
             }
             case TYPE_BIGINT: {
diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h
index c076aed941..effaf22757 100644
--- a/be/src/exec/parquet_writer.h
+++ b/be/src/exec/parquet_writer.h
@@ -93,6 +93,9 @@ public:
     int64_t written_len();
 
 private:
+    template <typename T>
+    void write_int32_column(int index, T* item);
+
     std::shared_ptr<ParquetOutputStream> _outstream;
     std::shared_ptr<parquet::WriterProperties> _properties;
     std::shared_ptr<parquet::schema::GroupNode> _schema;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to