sjyango commented on code in PR #58993:
URL: https://github.com/apache/doris/pull/58993#discussion_r2633579524


##########
be/src/vec/aggregate_functions/aggregate_function_python_udaf.cpp:
##########
@@ -247,15 +252,53 @@ void AggregatePythonUDAF::add(AggregateDataPtr __restrict 
place, const IColumn**
 void AggregatePythonUDAF::add_batch(size_t batch_size, AggregateDataPtr* 
places,
                                     size_t place_offset, const IColumn** 
columns, Arena&,
                                     bool /*agg_many*/) const {
-    // With shared client optimization, all places use the same Python process
-    // We still need to add each row individually because they go to different 
place_ids
+    // Build places array for this batch
+    std::vector<int64_t> places_array(batch_size);
     for (size_t i = 0; i < batch_size; ++i) {
-        int64_t place_id = reinterpret_cast<int64_t>(places[i] + place_offset);
-        Status st = this->data(places[i] + place_offset)
-                            .add(place_id, columns, i, i + 1, argument_types);
-        if (UNLIKELY(!st.ok())) {
-            throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
+        places_array[i] = reinterpret_cast<int64_t>(places[i] + place_offset);
+    }
+
+    // Build input block for entire batch
+    Block input_block;
+    for (size_t i = 0; i < argument_types.size(); ++i) {
+        input_block.insert(
+                ColumnWithTypeAndName(columns[i]->get_ptr(), 
argument_types[i], std::to_string(i)));
+    }
+
+    std::shared_ptr<arrow::Schema> schema;
+    cctz::time_zone timezone_obj;
+    std::call_once(_cache_init_flag, [this, &input_block]() {
+        DCHECK(!this->argument_types.empty()) << "Argument types must not be 
empty";
+        Status schema_status = get_arrow_schema_from_block(input_block, 
&this->_cached_arrow_schema,
+                                                           
TimezoneUtils::default_time_zone);
+        if (UNLIKELY(!schema_status.ok())) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
schema_status.to_string());
         }
+
+        TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
+                                           this->_cached_timezone);
+    });
+
+    schema = _cached_arrow_schema;
+    timezone_obj = _cached_timezone;
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    Status batch_status = convert_to_arrow_batch(input_block, schema, 
arrow::default_memory_pool(),
+                                                 &batch, timezone_obj);
+    if (UNLIKELY(!batch_status.ok())) {
+        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
batch_status.to_string());
+    }
+
+    // Get client from first place (all places in same thread share the same 
client)
+    auto client = this->data(places[0] + place_offset).client;

Review Comment:
   revert the add_batch's optimized code, because it has bug when thread 
changing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to