This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cdf5f0fe687 [fix](pipelineX) mark join column should be nullable (#25275) cdf5f0fe687 is described below commit cdf5f0fe687091087402a4e36c253548e21ac541 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Wed Oct 11 11:35:43 2023 +0800 [fix](pipelineX) mark join column should be nullable (#25275) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 11 +++++---- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 24 +++++++++++-------- be/src/pipeline/exec/join_build_sink_operator.h | 2 -- be/src/pipeline/exec/join_probe_operator.cpp | 8 +++---- be/src/pipeline/exec/join_probe_operator.h | 1 + .../exec/nested_loop_join_probe_operator.cpp | 27 ++++++++-------------- be/src/pipeline/pipeline_x/dependency.h | 1 + be/src/vec/exec/join/vhash_join_node.cpp | 3 ++- 8 files changed, 39 insertions(+), 38 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8dd84dfd27b..3b0342a926b 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -139,7 +139,8 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { auto& p = _parent->cast<HashJoinBuildSinkOperatorX>(); _shared_state->short_circuit_for_probe = - (_has_null_in_build_side && p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || + (_shared_state->_has_null_in_build_side && + p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !p._is_mark_join) || (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::INNER_JOIN && !p._is_mark_join) || (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::LEFT_SEMI_JOIN && @@ -203,7 +204,7 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, has_null_value || short_circuit_for_null_in_build_side ? &null_map_val->get_data() : nullptr, - &_has_null_in_build_side); + &_shared_state->_has_null_in_build_side); }}, *_shared_state->hash_table_variants, vectorized::make_bool_variant(_build_side_ignore_null), @@ -452,7 +453,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // make one block for each 4 gigabytes constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - if (local_state._has_null_in_build_side) { + if (local_state._shared_state->_has_null_in_build_side) { // TODO: if _has_null_in_build_side is true we should finish current pipeline task. DCHECK(state->enable_pipeline_exec()); return Status::OK(); @@ -538,7 +539,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _shared_hash_table_context->hash_table_variants = local_state._shared_state->hash_table_variants; _shared_hash_table_context->short_circuit_for_null_in_probe_side = - local_state._has_null_in_build_side; + local_state._shared_state->_has_null_in_build_side; if (local_state._runtime_filter_slots) { local_state._runtime_filter_slots->copy_to_shared_context( _shared_hash_table_context); @@ -556,7 +557,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.profile()->add_info_string( "SharedHashTableFrom", print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); - local_state._has_null_in_build_side = + local_state._shared_state->_has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; local_state._shared_state->hash_table_variants = std::static_pointer_cast<vectorized::HashTableVariants>( diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 0a4b528be38..a4a66507085 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -19,6 +19,7 @@ #include <string> +#include "common/logging.h" #include "pipeline/exec/operator.h" namespace doris { @@ -184,9 +185,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc local_state.init_for_probe(state); SCOPED_TIMER(local_state._probe_timer); if (local_state._shared_state->short_circuit_for_probe) { - /// If `_short_circuit_for_probe` is true, this indicates no rows - /// match the join condition, and this is 'mark join', so we need to create a column as mark - /// with all rows set to 0. + // If we use a short-circuit strategy, should return empty block directly. + source_state = SourceState::FINISHED; + return Status::OK(); + } + if (local_state._shared_state->_has_null_in_build_side && + _short_circuit_for_null_in_build_side) { + /// `_has_null_in_build_side` means have null value in build side. + /// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join). + /// We need to create a column as mark with all rows set to NULL. if (_is_mark_join) { auto block_rows = local_state._probe_block.rows(); if (block_rows == 0) { @@ -203,9 +210,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc temp_block.insert(local_state._probe_block.get_by_position(i)); } } - auto mark_column = vectorized::ColumnUInt8::create(block_rows, 0); - temp_block.insert( - {std::move(mark_column), std::make_shared<vectorized::DataTypeUInt8>(), ""}); + auto mark_column = vectorized::ColumnNullable::create( + vectorized::ColumnUInt8::create(block_rows, 0), + vectorized::ColumnUInt8::create(block_rows, 1)); + temp_block.insert({std::move(mark_column), + make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""}); { SCOPED_TIMER(local_state._join_filter_timer); @@ -220,9 +229,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc local_state.reached_limit(output_block, source_state); return Status::OK(); } - // If we use a short-circuit strategy, should return empty block directly. - source_state = SourceState::FINISHED; - return Status::OK(); } local_state._join_block.clear_column_data(); diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 2f7a3ec03ea..5f440836874 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -40,8 +40,6 @@ protected: template <typename LocalStateType> friend class JoinBuildSinkOperatorX; - bool _has_null_in_build_side = false; - RuntimeProfile::Counter* _build_rows_counter; RuntimeProfile::Counter* _push_down_timer; RuntimeProfile::Counter* _push_compute_timer; diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index c4776afe019..63074bed70c 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -66,9 +66,8 @@ void JoinProbeLocalState<DependencyType, Derived>::_construct_mutable_join_block } } if (p._is_mark_join) { - _join_block.replace_by_position( - _join_block.columns() - 1, - remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column)); + DCHECK(!p._is_mark_join || + _join_block.get_by_position(_join_block.columns() - 1).column->is_nullable()); } } @@ -183,7 +182,8 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T ? tnode.nested_loop_join_node.is_mark : false) : tnode.hash_join_node.__isset.is_mark ? tnode.hash_join_node.is_mark - : false) { + : false), + _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { if (tnode.__isset.hash_join_node) { _intermediate_row_desc.reset(new RowDescriptor( descs, tnode.hash_join_node.vintermediate_tuple_id_list, diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 863160b83f6..5727318a4f4 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -113,6 +113,7 @@ protected: // output expr vectorized::VExprContextSPtrs _output_expr_ctxs; OperatorXPtr _build_side_child; + const bool _short_circuit_for_null_in_build_side; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index ecac7c94dd1..14e19dd352f 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -22,6 +22,7 @@ #include "pipeline/exec/operator.h" #include "vec/core/block.h" #include "vec/exec/join/vnested_loop_join_node.h" +#include "vec/columns/column_filter_helper.h" namespace doris { class RuntimeState; @@ -102,12 +103,9 @@ void NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block* } } if (p._is_mark_join) { - vectorized::IColumn::Filter& mark_data = - assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>( - *block->get_by_position(block->columns() - 1).column->assume_mutable()) - .get_data(); - if (mark_data.size() < block->rows()) { - mark_data.resize_fill(block->rows(), 1); + auto mark_column = block->get_by_position(block->columns() - 1).column->assume_mutable(); + if (mark_column->size() < block->rows()) { + vectorized::ColumnFilterHelper(*mark_column).resize_fill(block->rows(), 1); } } } @@ -343,15 +341,12 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB _resize_fill_tuple_is_null_column(new_size, 0, 1); } } else { - vectorized::IColumn::Filter& mark_data = - assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>( - *dst_columns[dst_columns.size() - 1]) - .get_data(); - mark_data.reserve(mark_data.size() + _left_side_process_count); + vectorized::ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]); + mark_column.reserve(mark_column.size() + _left_side_process_count); DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { - mark_data.emplace_back(IsSemi == _cur_probe_row_visited_flags[j]); + mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]); } for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName src_column = @@ -396,11 +391,9 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null( for (size_t i = 0; i < p._num_build_side_columns; ++i) { dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count); } - vectorized::IColumn::Filter& mark_data = - assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>( - *dst_columns[dst_columns.size() - 1]) - .get_data(); - mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0); + auto& mark_column = *dst_columns[dst_columns.size() - 1]; + vectorized::ColumnFilterHelper(mark_column) + .resize_fill(mark_column.size() + _left_side_process_count, 0); } void NestedLoopJoinProbeLocalState::_process_left_child_block( diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 5c9ebb452e1..11e6a0b5c93 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -548,6 +548,7 @@ struct JoinSharedState { // For some join case, we can apply a short circuit strategy // 1. _has_null_in_build_side = true // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti + bool _has_null_in_build_side = false; bool short_circuit_for_probe = false; vectorized::JoinOpVariants join_op_variants; }; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 5f769e4cafe..3ec7d364d56 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -159,7 +159,8 @@ HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* lo _probe_key_sz(local_state->_shared_state->probe_key_sz), _left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags), _right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags), - _is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output) {} + _is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output), + _has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side) {} HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node) : _hash_table_memory_usage(join_node->_hash_table_memory_usage), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org