This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9d7bc5b7652 [pick](branch-2.1) pick #38215 (#43386)
9d7bc5b7652 is described below

commit 9d7bc5b765259f101da836126ce48cb8f7724b22
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Sat Nov 9 22:13:05 2024 +0800

    [pick](branch-2.1) pick #38215 (#43386)
    
    pick #38215
    
    ---------
    
    Co-authored-by: Zou Xinyi <zouxi...@selectdb.com>
---
 be/src/pipeline/exec/result_sink_operator.cpp      |  5 ++-
 be/src/util/arrow/row_batch.cpp                    | 37 ++++++++++++----------
 be/src/util/arrow/row_batch.h                      | 11 ++++---
 be/src/vec/runtime/vparquet_transformer.cpp        |  3 +-
 be/src/vec/sink/vmemory_scratch_sink.cpp           |  2 +-
 be/src/vec/sink/vresult_sink.cpp                   |  4 ++-
 .../serde/data_type_serde_arrow_test.cpp           |  4 +--
 7 files changed, 40 insertions(+), 26 deletions(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 1114bdd312f..52daf425f55 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -17,6 +17,8 @@
 
 #include "result_sink_operator.h"
 
+#include <sys/select.h>
+
 #include <memory>
 
 #include "common/config.h"
@@ -96,7 +98,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
     }
     case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
         std::shared_ptr<arrow::Schema> arrow_schema;
-        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema));
+        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema,
+                                                       state->timezone()));
         
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
                                                                arrow_schema);
         _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index cb4ccf15ba2..ba6f4adf6c6 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -46,7 +46,8 @@ namespace doris {
 
 using strings::Substitute;
 
-Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result) {
+Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,
+                             const std::string& timezone) {
     switch (type.type) {
     case TYPE_NULL:
         *result = arrow::null();
@@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         break;
     case TYPE_DATETIMEV2:
         if (type.scale > 3) {
-            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
         } else if (type.scale > 0) {
-            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI);
+            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI, timezone);
         } else {
-            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
+            *result = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND, timezone);
         }
         break;
     case TYPE_DECIMALV2:
@@ -120,7 +121,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
     case TYPE_ARRAY: {
         DCHECK_EQ(type.children.size(), 1);
         std::shared_ptr<arrow::DataType> item_type;
-        static_cast<void>(convert_to_arrow_type(type.children[0], &item_type));
+        static_cast<void>(convert_to_arrow_type(type.children[0], &item_type, 
timezone));
         *result = std::make_shared<arrow::ListType>(item_type);
         break;
     }
@@ -128,8 +129,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         DCHECK_EQ(type.children.size(), 2);
         std::shared_ptr<arrow::DataType> key_type;
         std::shared_ptr<arrow::DataType> val_type;
-        static_cast<void>(convert_to_arrow_type(type.children[0], &key_type));
-        static_cast<void>(convert_to_arrow_type(type.children[1], &val_type));
+        static_cast<void>(convert_to_arrow_type(type.children[0], &key_type, 
timezone));
+        static_cast<void>(convert_to_arrow_type(type.children[1], &val_type, 
timezone));
         *result = std::make_shared<arrow::MapType>(key_type, val_type);
         break;
     }
@@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
         std::vector<std::shared_ptr<arrow::Field>> fields;
         for (size_t i = 0; i < type.children.size(); i++) {
             std::shared_ptr<arrow::DataType> field_type;
-            static_cast<void>(convert_to_arrow_type(type.children[i], 
&field_type));
+            static_cast<void>(convert_to_arrow_type(type.children[i], 
&field_type, timezone));
             
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
                                                             
type.contains_nulls[i]));
         }
@@ -156,20 +157,22 @@ Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::
     return Status::OK();
 }
 
-Status convert_to_arrow_field(SlotDescriptor* desc, 
std::shared_ptr<arrow::Field>* field) {
+Status convert_to_arrow_field(SlotDescriptor* desc, 
std::shared_ptr<arrow::Field>* field,
+                              const std::string& timezone) {
     std::shared_ptr<arrow::DataType> type;
-    RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type));
+    RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone));
     *field = arrow::field(desc->col_name(), type, desc->is_nullable());
     return Status::OK();
 }
 
 Status convert_block_arrow_schema(const vectorized::Block& block,
-                                  std::shared_ptr<arrow::Schema>* result) {
+                                  std::shared_ptr<arrow::Schema>* result,
+                                  const std::string& timezone) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (const auto& type_and_name : block) {
         std::shared_ptr<arrow::DataType> arrow_type;
         
RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(),
-                                              &arrow_type));
+                                              &arrow_type, timezone));
         fields.push_back(std::make_shared<arrow::Field>(type_and_name.name, 
arrow_type,
                                                         
type_and_name.type->is_nullable()));
     }
@@ -178,12 +181,13 @@ Status convert_block_arrow_schema(const 
vectorized::Block& block,
 }
 
 Status convert_to_arrow_schema(const RowDescriptor& row_desc,
-                               std::shared_ptr<arrow::Schema>* result) {
+                               std::shared_ptr<arrow::Schema>* result,
+                               const std::string& timezone) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (auto tuple_desc : row_desc.tuple_descriptors()) {
         for (auto desc : tuple_desc->slots()) {
             std::shared_ptr<arrow::Field> field;
-            RETURN_IF_ERROR(convert_to_arrow_field(desc, &field));
+            RETURN_IF_ERROR(convert_to_arrow_field(desc, &field, timezone));
             fields.push_back(field);
         }
     }
@@ -192,12 +196,13 @@ Status convert_to_arrow_schema(const RowDescriptor& 
row_desc,
 }
 
 Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
-                                      std::shared_ptr<arrow::Schema>* result) {
+                                      std::shared_ptr<arrow::Schema>* result,
+                                      const std::string& timezone) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
         std::shared_ptr<arrow::DataType> arrow_type;
         auto root_expr = output_vexpr_ctxs.at(i)->root();
-        RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
+        RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, 
timezone));
         auto field_name = root_expr->is_slot_ref() && 
!root_expr->expr_label().empty()
                                   ? root_expr->expr_label()
                                   : fmt::format("{}_{}", 
root_expr->data_type()->get_name(), i);
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index ddffc3324d3..9a33719a1cf 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -41,17 +41,20 @@ namespace doris {
 
 class RowDescriptor;
 
-Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result);
+Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,
+                             const std::string& timezone);
 
 // Convert Doris RowDescriptor to Arrow Schema.
 Status convert_to_arrow_schema(const RowDescriptor& row_desc,
-                               std::shared_ptr<arrow::Schema>* result);
+                               std::shared_ptr<arrow::Schema>* result, const 
std::string& timezone);
 
 Status convert_block_arrow_schema(const vectorized::Block& block,
-                                  std::shared_ptr<arrow::Schema>* result);
+                                  std::shared_ptr<arrow::Schema>* result,
+                                  const std::string& timezone);
 
 Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& 
output_vexpr_ctxs,
-                                      std::shared_ptr<arrow::Schema>* result);
+                                      std::shared_ptr<arrow::Schema>* result,
+                                      const std::string& timezone);
 
 Status serialize_record_batch(const arrow::RecordBatch& record_batch, 
std::string* result);
 
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index 1969858349f..f0810d6c7ce 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() {
     std::vector<std::shared_ptr<arrow::Field>> fields;
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
         std::shared_ptr<arrow::DataType> type;
-        
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type));
+        
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type,
+                                              _state->timezone()));
         if (_parquet_schemas != nullptr) {
             std::shared_ptr<arrow::Field> field =
                     
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp 
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index eca9e65ab49..904af9a9f1b 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -88,7 +88,7 @@ Status MemoryScratchSink::send(RuntimeState* state, Block* 
input_block, bool eos
                                                                        
*input_block, &block));
     std::shared_ptr<arrow::Schema> block_arrow_schema;
     // After expr executed, use recaculated schema as final schema
-    RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema));
+    RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, 
state->timezone()));
     RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, 
arrow::default_memory_pool(),
                                            &result, _timezone_obj));
     _queue->blocking_put(result);
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index e089f788160..dc63fdb4be6 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -18,6 +18,7 @@
 #include "vec/sink/vresult_sink.h"
 
 #include <fmt/format.h>
+#include <sys/select.h>
 #include <time.h>
 
 #include <new>
@@ -105,7 +106,8 @@ Status VResultSink::prepare(RuntimeState* state) {
     }
     case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
         std::shared_ptr<arrow::Schema> arrow_schema;
-        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema));
+        RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, 
&arrow_schema,
+                                                       state->timezone()));
         
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
                                                                arrow_schema);
         _writer.reset(new (std::nothrow) 
VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs,
diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
index 4793ae8128a..fc692b8f675 100644
--- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
@@ -489,7 +489,7 @@ void serialize_and_deserialize_arrow_test() {
     RowDescriptor row_desc(&tuple_desc, true);
     // arrow schema
     std::shared_ptr<arrow::Schema> _arrow_schema;
-    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
+    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), 
Status::OK());
 
     // serialize
     std::shared_ptr<arrow::RecordBatch> result;
@@ -623,7 +623,7 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
     RowDescriptor row_desc(&tuple_desc, true);
     // arrow schema
     std::shared_ptr<arrow::Schema> _arrow_schema;
-    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
+    EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), 
Status::OK());
 
     // serialize
     std::shared_ptr<arrow::RecordBatch> result;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to