mrhhsg commented on code in PR #58228:
URL: https://github.com/apache/doris/pull/58228#discussion_r2638828483
##########
be/src/pipeline/exec/table_function_operator.cpp:
##########
@@ -209,13 +222,214 @@ Status
TableFunctionLocalState::get_expanded_block(RuntimeState* state,
}
}
- _copy_output_slots(columns);
+ _copy_output_slots(columns, p);
size_t row_size = columns[p._child_slots.size()]->size();
for (auto index : p._useless_slot_indexs) {
columns[index]->insert_many_defaults(row_size -
columns[index]->size());
}
+ {
+ SCOPED_TIMER(_filter_timer); // 3. eval conjuncts
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_expand_conjuncts_ctxs,
output_block,
+
output_block->columns()));
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
output_block->columns()));
+ }
+
+ *eos = _child_eos && _cur_child_offset == -1;
+ return Status::OK();
+}
+
+Status TableFunctionLocalState::_get_expanded_block_for_outer_conjuncts(
+ RuntimeState* state, vectorized::Block* output_block, bool* eos) {
+ auto& p = _parent->cast<TableFunctionOperatorX>();
+ vectorized::MutableBlock m_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ output_block, p._output_slots);
+ vectorized::MutableColumns& columns = m_block.mutable_columns();
+ auto child_slot_count = p._child_slots.size();
+ for (int i = 0; i < p._fn_num; i++) {
+ if (columns[i + child_slot_count]->is_nullable()) {
+ _fns[i]->set_nullable();
+ }
+ }
+
+ std::vector<int64_t> child_row_to_output_rows_indices;
+ std::vector<size_t> handled_row_indices;
+ bool child_block_empty = _child_block->empty();
+ if (!child_block_empty) {
+ child_row_to_output_rows_indices.push_back(0);
+ }
+
+ auto batch_size = state->batch_size();
+ auto output_row_count = columns[child_slot_count]->size();
+ while (output_row_count < batch_size) {
+ RETURN_IF_CANCELLED(state);
+
+ // finished handling current child block
+ if (_cur_child_offset == -1) {
+ break;
+ }
+
+ bool skip_child_row = false;
+ while (output_row_count < batch_size) {
+ // if table function is not outer and has empty result, go to next
child row
+ if (_fns[0]->eos() || skip_child_row) {
+ _copy_output_slots(columns, p);
+ if (!skip_child_row) {
+ handled_row_indices.push_back(_cur_child_offset);
+
child_row_to_output_rows_indices.push_back(output_row_count);
+ }
+ process_next_child_row();
+ if (_cur_child_offset == -1) {
+ break;
+ }
+ }
+ if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
+ _child_rows_has_output[_cur_child_offset] = true;
+ continue;
+ }
+
+ // It may take multiple iterations of this while loop to process a
child row if
+ // the table function produces a large number of rows.
+ auto repeat_times = _fns[0]->get_value(columns[child_slot_count],
+ batch_size -
(int)output_row_count);
+ _current_row_insert_times += repeat_times;
+ output_row_count = columns[child_slot_count]->size();
+ }
+ }
+ // Two scenarios the loop above will exit:
+ // 1. current child block is finished processing
+ // _cur_child_offset == -1
+ // 2. output_block reaches batch size
+ // fn maybe or maybe not eos
+ if (output_row_count >= batch_size) {
+ _copy_output_slots(columns, p);
+ handled_row_indices.push_back(_cur_child_offset);
+ child_row_to_output_rows_indices.push_back(output_row_count);
+ if (_fns[0]->eos()) {
+ process_next_child_row();
+ }
+ }
+ for (auto index : p._useless_slot_indexs) {
+ columns[index]->insert_many_defaults(output_row_count -
columns[index]->size());
+ }
+ output_block->set_columns(std::move(columns));
+
+ /**
+ Handle the outer conjuncts after unnest. Currently, only left outer is
supported.
+ e.g., for the following example data,
+ select id, name, tags from items_dict_unnest_t order by id;
+
+------+---------------------+-------------------------------------------------+
+ | id | name | tags
|
+
+------+---------------------+-------------------------------------------------+
+ | 1 | Laptop | ["Electronics", "Office", "High-End",
"Laptop"] |
+ | 2 | Mechanical Keyboard | ["Electronics", "Accessories"]
|
+ | 3 | Basketball | ["Sports", "Outdoor"]
|
+ | 4 | Badminton Racket | ["Sports", "Equipment"]
|
+ | 5 | Shirt | ["Clothing", "Office", "Shirt"]
|
+
+------+---------------------+-------------------------------------------------+
+
+ for this query: ``` SELECT
+ id,
+ name,
+ tags,
+ t.tag
+ FROM
+ items_dict_unnest_t
+ LEFT JOIN lateral unnest(tags) AS t(tag) ON t.tag = name;```
+
+ after unnest, before evaluating the outer conjuncts, the result is:
+
+------+---------------------+-------------------------------------------------+--------------+
+ | id | name | tags
| unnest(tags) |
+
+------+---------------------+-------------------------------------------------+--------------+
+ | 1 | Laptop | ["Electronics", "Office", "High-End",
"Laptop"] | Electronics |
+ | 1 | Laptop | ["Electronics", "Office", "High-End",
"Laptop"] | Office |
+ | 1 | Laptop | ["Electronics", "Office", "High-End",
"Laptop"] | High-End |
+ | 1 | Laptop | ["Electronics", "Office", "High-End",
"Laptop"] | Laptop |
+ | 2 | Mechanical Keyboard | ["Electronics", "Accessories"]
| Electronics |
+ | 2 | Mechanical Keyboard | ["Electronics", "Accessories"]
| Accessories |
+ | 3 | Basketball | ["Sports", "Outdoor"]
| Sports |
+ | 3 | Basketball | ["Sports", "Outdoor"]
| Outdoor |
+ | 4 | Badminton Racket | ["Sports", "Equipment"]
| Sports |
+ | 4 | Badminton Racket | ["Sports", "Equipment"]
| Equipment |
+ | 5 | Shirt | ["Clothing", "Office", "Shirt"]
| Clothing |
+ | 5 | Shirt | ["Clothing", "Office", "Shirt"]
| Office |
+ | 5 | Shirt | ["Clothing", "Office", "Shirt"]
| Shirt |
+
+------+---------------------+-------------------------------------------------+--------------+
+ 13 rows in set (0.47 sec)
+
+ the vector child_row_to_output_rows_indices is used to record the mapping
relationship,
+ between child row and output rows, for example:
+ child row 0 -> output rows [0, 4)
+ child row 1 -> output rows [4, 6)
+ child row 2 -> output rows [6, 8)
+ child row 3 -> output rows [8, 10)
+ child row 4 -> output rows [10, 13)
+ it's contents are: [0, 4, 6, 8, 10, 13].
+
+ After evaluating the left join conjuncts `t.tag = name`,
+ the content of filter is: [0, 0, 0, 1, // child row 0
+ 0, 0, // child row 1
+ 0, 0, // child row 2
+ 0, 0, // child row 3
+ 0, 0, 1 // child row 4
+ ]
+ child rows 1, 2, 3 are all filtered out, so we need to insert one row with
NULL tag value for each of them.
+ */
+ if (!child_block_empty) {
+ vectorized::IColumn::Filter filter;
+ auto column_count = output_block->columns();
+ vectorized::ColumnNumbers columns_to_filter(column_count);
+ std::iota(columns_to_filter.begin(), columns_to_filter.end(), 0);
+
RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts_and_filter_block(
+ _expand_conjuncts_ctxs, output_block, columns_to_filter,
column_count, filter));
+ size_t remain_row_count = output_block->rows();
+ // for outer table function, need to handle those child rows which all
expanded rows are filtered out
+ auto handled_child_row_count = handled_row_indices.size();
+ if (remain_row_count < output_row_count) {
+ for (size_t i = 0; i < handled_child_row_count; ++i) {
+ auto start_row_idx = child_row_to_output_rows_indices[i];
+ auto end_row_idx = child_row_to_output_rows_indices[i + 1];
+ if (simd::contain_byte((uint8_t*)filter.data() + start_row_idx,
+ end_row_idx - start_row_idx, 1)) {
+ _child_rows_has_output[handled_row_indices[i]] = true;
+ }
+ }
+ } else {
+ for (auto row_idx : handled_row_indices) {
+ _child_rows_has_output[row_idx] = true;
+ }
+ }
+
+ if (-1 == _cur_child_offset) {
+ // Finished handling current child block,
+ vectorized::MutableBlock m_block2 =
+
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
+
p._output_slots);
+ vectorized::MutableColumns& columns2 = m_block2.mutable_columns();
+ auto child_block_row_count = _child_block->rows();
+ for (size_t i = 0; i != child_block_row_count; i++) {
Review Comment:
Maybe we can first collect the indices and then insert them column by column.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]