This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 06ff59bc03d [Performance](sink) SIMD the tablet sink valied data function (#25480) 06ff59bc03d is described below commit 06ff59bc03db294508bb0d169292d4a3302ea974 Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue Oct 17 16:21:08 2023 +0800 [Performance](sink) SIMD the tablet sink valied data function (#25480) --- be/src/vec/sink/vtablet_block_convertor.cpp | 173 ++++++++++++++-------------- be/src/vec/sink/vtablet_block_convertor.h | 12 +- be/src/vec/sink/vtablet_sink_v2.cpp | 5 +- be/src/vec/sink/writer/vtablet_writer.cpp | 13 +-- 4 files changed, 104 insertions(+), 99 deletions(-) diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index da9e3fc743c..436eb3639de 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -72,12 +72,12 @@ Status OlapTableBlockConvertor::validate_and_convert_block( RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows)); } - int64_t filtered_rows = 0; + int filtered_rows = 0; { SCOPED_RAW_TIMER(&_validate_data_ns); - _filter_bitmap.Reset(block->rows()); + _filter_map.resize(rows, 0); bool stop_processing = false; - RETURN_IF_ERROR(_validate_data(state, block.get(), filtered_rows, &stop_processing)); + RETURN_IF_ERROR(_validate_data(state, block.get(), rows, filtered_rows, &stop_processing)); _num_filtered_rows += filtered_rows; has_filtered_rows = filtered_rows > 0; if (stop_processing) { @@ -161,11 +161,12 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, fmt::memory_buffer& error_prefix, + const uint32_t row_count, vectorized::IColumn::Permutation* rows) { - DCHECK((rows == nullptr) || (rows->size() == column->size())); + DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { - _filter_bitmap.Set(row, true); + _filter_map[row] = true; auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; }, [&error_prefix, &error_msg]() -> std::string { return fmt::to_string(error_prefix) + @@ -180,10 +181,9 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type auto& real_column_ptr = column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); auto null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data(); auto need_to_validate = [&null_map, this](size_t j, size_t row) { - return !_filter_bitmap.Get(row) && (null_map == nullptr || null_map[j] == 0); + return !_filter_map[row] && (null_map == nullptr || null_map[j] == 0); }; - ssize_t last_invalid_row = -1; switch (type.type) { case TYPE_CHAR: case TYPE_VARCHAR: @@ -196,43 +196,49 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type if (type.len > 0) { limit = std::min(config::string_type_length_soft_limit_bytes, type.len); } - for (size_t j = 0; j < column->size(); ++j) { - auto row = rows ? (*rows)[j] : j; - if (row == last_invalid_row) { - continue; - } - if (need_to_validate(j, row)) { - auto str_val = column_string->get_data_at(j); - bool invalid = str_val.size > limit; - if (invalid) { - last_invalid_row = row; - if (str_val.size > type.len) { - fmt::format_to(error_msg, "{}", - "the length of input is too long than schema. "); - fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", - str_val.to_prefix(32)); - fmt::format_to(error_msg, "schema length: {}; ", type.len); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); - } else if (str_val.size > limit) { - fmt::format_to(error_msg, "{}", - "the length of input string is too long than vec schema. "); - fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", - str_val.to_prefix(32)); - fmt::format_to(error_msg, "schema length: {}; ", type.len); - fmt::format_to(error_msg, "limit length: {}; ", limit); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + + auto* __restrict offsets = column_string->get_offsets().data(); + int invalid_count = 0; + for (int j = 0; j < row_count; ++j) { + invalid_count += (offsets[j] - offsets[j - 1]) > limit; + } + + if (invalid_count) { + for (size_t j = 0; j < row_count; ++j) { + auto row = rows ? (*rows)[j] : j; + if (need_to_validate(j, row)) { + auto str_val = column_string->get_data_at(j); + bool invalid = str_val.size > limit; + if (invalid) { + if (str_val.size > type.len) { + fmt::format_to(error_msg, "{}", + "the length of input is too long than schema. "); + fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", + str_val.to_prefix(32)); + fmt::format_to(error_msg, "schema length: {}; ", type.len); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } else if (str_val.size > limit) { + fmt::format_to( + error_msg, "{}", + "the length of input string is too long than vec schema. "); + fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", + str_val.to_prefix(32)); + fmt::format_to(error_msg, "schema length: {}; ", type.len); + fmt::format_to(error_msg, "limit length: {}; ", limit); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); } - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); } } } break; } case TYPE_JSONB: { - const auto column_string = + const auto* column_string = assert_cast<const vectorized::ColumnString*>(real_column_ptr.get()); - for (size_t j = 0; j < column->size(); ++j) { - if (!_filter_bitmap.Get(j)) { + for (size_t j = 0; j < row_count; ++j) { + if (!_filter_map[j]) { if (is_nullable && column_ptr && column_ptr->is_null_at(j)) { continue; } @@ -248,16 +254,13 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type break; } case TYPE_DECIMALV2: { - auto column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>( + auto* column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>( assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128>*>( real_column_ptr.get())); const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type); const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type); - for (size_t j = 0; j < column->size(); ++j) { + for (size_t j = 0; j < row_count; ++j) { auto row = rows ? (*rows)[j] : j; - if (row == last_invalid_row) { - continue; - } if (need_to_validate(j, row)) { auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>( column_decimal->get_data()[j]); @@ -284,7 +287,6 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type } if (invalid) { - last_invalid_row = row; RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); } } @@ -292,31 +294,36 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type break; } case TYPE_DECIMAL32: { -#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType) \ - auto column_decimal = const_cast<vectorized::ColumnDecimal<DecimalType>*>( \ - assert_cast<const vectorized::ColumnDecimal<DecimalType>*>(real_column_ptr.get())); \ - const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type); \ - const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type); \ - for (size_t j = 0; j < column->size(); ++j) { \ - auto row = rows ? (*rows)[j] : j; \ - if (row == last_invalid_row) { \ - continue; \ - } \ - if (need_to_validate(j, row)) { \ - auto dec_val = column_decimal->get_data()[j]; \ - bool invalid = false; \ - if (dec_val > max_decimal || dec_val < min_decimal) { \ - fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \ - fmt::format_to(error_msg, ", value={}", dec_val); \ - fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, type.scale); \ - fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, max_decimal); \ - invalid = true; \ - } \ - if (invalid) { \ - last_invalid_row = row; \ - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \ - } \ - } \ +#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType) \ + auto column_decimal = const_cast<vectorized::ColumnDecimal<DecimalType>*>( \ + assert_cast<const vectorized::ColumnDecimal<DecimalType>*>(real_column_ptr.get())); \ + const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type); \ + const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type); \ + const auto* __restrict datas = column_decimal->get_data().data(); \ + int invalid_count = 0; \ + for (int j = 0; j < row_count; ++j) { \ + const auto dec_val = datas[j]; \ + invalid_count += dec_val > max_decimal || dec_val < min_decimal; \ + } \ + if (invalid_count) { \ + for (size_t j = 0; j < row_count; ++j) { \ + auto row = rows ? (*rows)[j] : j; \ + if (need_to_validate(j, row)) { \ + auto dec_val = column_decimal->get_data()[j]; \ + bool invalid = false; \ + if (dec_val > max_decimal || dec_val < min_decimal) { \ + fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \ + fmt::format_to(error_msg, ", value={}", dec_val); \ + fmt::format_to(error_msg, ", precision={}, scale={}", type.precision, \ + type.scale); \ + fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, max_decimal); \ + invalid = true; \ + } \ + if (invalid) { \ + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \ + } \ + } \ + } \ } CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal32); break; @@ -331,13 +338,13 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type } #undef CHECK_VALIDATION_FOR_DECIMALV3 case TYPE_ARRAY: { - const auto column_array = + const auto* column_array = assert_cast<const vectorized::ColumnArray*>(real_column_ptr.get()); DCHECK(type.children.size() == 1); auto nested_type = type.children[0]; const auto& offsets = column_array->get_offsets(); vectorized::IColumn::Permutation permutation(offsets.back()); - for (size_t r = 0; r < offsets.size(); ++r) { + for (size_t r = 0; r < row_count; ++r) { for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { permutation[c] = rows ? (*rows)[r] : r; } @@ -345,7 +352,7 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type fmt::format_to(error_prefix, "ARRAY type failed: "); RETURN_IF_ERROR(_validate_column(state, nested_type, type.contains_nulls[0], column_array->get_data_ptr(), slot_index, stop_processing, - error_prefix, &permutation)); + error_prefix, permutation.size(), &permutation)); break; } case TYPE_MAP: { @@ -355,7 +362,7 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type auto val_type = type.children[1]; const auto& offsets = column_map->get_offsets(); vectorized::IColumn::Permutation permutation(offsets.back()); - for (size_t r = 0; r < offsets.size(); ++r) { + for (size_t r = 0; r < row_count; ++r) { for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { permutation[c] = rows ? (*rows)[r] : r; } @@ -363,10 +370,10 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type fmt::format_to(error_prefix, "MAP type failed: "); RETURN_IF_ERROR(_validate_column(state, key_type, type.contains_nulls[0], column_map->get_keys_ptr(), slot_index, stop_processing, - error_prefix, &permutation)); + error_prefix, permutation.size(), &permutation)); RETURN_IF_ERROR(_validate_column(state, val_type, type.contains_nulls[1], column_map->get_values_ptr(), slot_index, stop_processing, - error_prefix, &permutation)); + error_prefix, permutation.size(), &permutation)); break; } case TYPE_STRUCT: { @@ -377,7 +384,8 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) { RETURN_IF_ERROR(_validate_column(state, type.children[sc], type.contains_nulls[sc], column_struct->get_column_ptr(sc), slot_index, - stop_processing, error_prefix)); + stop_processing, error_prefix, + column_struct->get_column_ptr(sc)->size())); } break; } @@ -390,15 +398,11 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type // 1. column is nullable but the desc is not nullable // 2. desc->type is BITMAP if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) { - for (int j = 0; j < column->size(); ++j) { + for (int j = 0; j < row_count; ++j) { auto row = rows ? (*rows)[j] : j; - if (row == last_invalid_row) { - continue; - } - if (null_map[j] && !_filter_bitmap.Get(row)) { + if (null_map[j] && !_filter_map[row]) { fmt::format_to(error_msg, "null value for not null column, type={}", type.debug_string()); - last_invalid_row = row; RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); } } @@ -408,7 +412,8 @@ Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type } Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::Block* block, - int64_t& filtered_rows, bool* stop_processing) { + const uint32_t rows, int& filtered_rows, + bool* stop_processing) { for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { SlotDescriptor* desc = _output_tuple_desc->slots()[i]; DCHECK(block->columns() > i) @@ -423,12 +428,12 @@ Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized:: fmt::memory_buffer error_prefix; fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name()); RETURN_IF_ERROR(_validate_column(state, desc->type(), desc->is_nullable(), column, i, - stop_processing, error_prefix)); + stop_processing, error_prefix, rows)); } filtered_rows = 0; - for (int i = 0; i < block->rows(); ++i) { - filtered_rows += _filter_bitmap.Get(i); + for (int i = 0; i < rows; ++i) { + filtered_rows += _filter_map[i]; } return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index bf5148cf0a7..27440c628be 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -40,14 +40,14 @@ namespace doris::vectorized { class OlapTableBlockConvertor { public: OlapTableBlockConvertor(TupleDescriptor* output_tuple_desc) - : _output_tuple_desc(output_tuple_desc), _filter_bitmap(1024) {} + : _output_tuple_desc(output_tuple_desc) {} Status validate_and_convert_block(RuntimeState* state, vectorized::Block* input_block, std::shared_ptr<vectorized::Block>& block, vectorized::VExprContextSPtrs output_vexpr_ctxs, size_t rows, bool& has_filtered_rows); - const Bitmap& filter_bitmap() { return _filter_bitmap; } + const char* filter_map() const { return _filter_map.data(); } int64_t validate_data_ns() const { return _validate_data_ns; } @@ -66,15 +66,15 @@ private: Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, + fmt::memory_buffer& error_prefix, const uint32_t row_count, vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. // invalid row number is set in Bitmap // set stop_processing if we want to stop the whole process now. - Status _validate_data(RuntimeState* state, vectorized::Block* block, int64_t& filtered_rows, - bool* stop_processing); + Status _validate_data(RuntimeState* state, vectorized::Block* block, const uint32_t rows, + int& filtered_rows, bool* stop_processing); // some output column of output expr may have different nullable property with dest slot desc // so here need to do the convert operation @@ -94,7 +94,7 @@ private: std::map<int, int128_t> _max_decimal128_val; std::map<int, int128_t> _min_decimal128_val; - Bitmap _filter_bitmap; + std::vector<char> _filter_map; int64_t _validate_data_ns = 0; int64_t _num_filtered_rows = 0; diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 45fa48cb327..13d25f5cf46 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -284,9 +284,10 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc RowsForTablet rows_for_tablet; _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); - auto num_rows = block->rows(); + const auto num_rows = input_rows; + const auto* __restrict filter_map = _block_convertor->filter_map(); for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + if (UNLIKELY(has_filtered_rows) && filter_map[i]) { continue; } const VOlapTablePartition* partition = nullptr; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index c0676a5670b..7035653ee72 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1343,7 +1343,7 @@ Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized uint32_t tablet_index = 0; bool stop_processing = false; for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { continue; } bool is_continue = false; @@ -1373,7 +1373,7 @@ Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized auto& selector = channel_to_payload[j][channel.get()].first; auto& tablet_ids = channel_to_payload[j][channel.get()].second; for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { continue; } selector->push_back(i); @@ -1712,7 +1712,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { // try to find tablet and save missing value for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { continue; } const VOlapTablePartition* partition = nullptr; @@ -1761,7 +1761,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } // creating done } else { // not auto partition for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { continue; } const VOlapTablePartition* partition = nullptr; @@ -1794,7 +1794,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { vectorized::IColumn::Filter& filter_col = static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data(); for (size_t i = 0; i < filter_col.size(); ++i) { - filter_data[i] = !_block_convertor->filter_bitmap().Get(i); + filter_data[i] = !_block_convertor->filter_map()[i]; } RETURN_IF_CATCH_EXCEPTION(vectorized::Block::filter_block_internal( block.get(), filter_col, block->columns())); @@ -1849,8 +1849,7 @@ Status VTabletWriter::write_wal(OlapTableBlockConvertor* block_convertor, auto cloneBlock = block->clone_without_columns(); auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); for (int i = 0; i < num_rows; ++i) { - if (block_convertor->num_filtered_rows() > 0 && - block_convertor->filter_bitmap().Get(i)) { + if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) { continue; } if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org