This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 484e7de [Doirs On ES] fix bug for sparse docvalue context and remove the mistake usage of total (#3751) 484e7de is described below commit 484e7de3c58f2f73e79a5f752b47d6d5b8f3aea1 Author: Yunfeng,Wu <wuyunfen...@baidu.com> AuthorDate: Thu Jun 4 16:31:18 2020 +0800 [Doirs On ES] fix bug for sparse docvalue context and remove the mistake usage of total (#3751) The other PR : https://github.com/apache/incubator-doris/pull/3513 (https://github.com/apache/incubator-doris/issues/3479) try to resolved the `inner hits node is not an array` because when a query( batch-size) run against new segment without this field, as-well the filter_path just only take `hits.hits.fields` 、`hits.hits._source` into account, this would appear an null inner hits node: ``` { "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAHaUWY1ExUVd0ZWlRY2", "hits": { "total": 1 } } ``` Unfortunately this PR introduce another serious inconsistent result with different batch_size because of misusing the `total`. To avoid this two problem, we just add `hits.hits._score` to filter_path when `docvalue_mode` is true, `_score` would always `null` , and populate the inner hits node: ``` { "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAHaUWY1ExUVd0ZWlRY2", "hits": { "total": 1, "hits": [ { "_score": null } ] } } ``` related issue: https://github.com/apache/incubator-doris/issues/3752 --- be/src/exec/es/es_scan_reader.cpp | 15 +++++++------- be/src/exec/es/es_scroll_parser.cpp | 41 ++++--------------------------------- 2 files changed, 12 insertions(+), 44 deletions(-) diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp index 4d66738..43af786 100644 --- a/be/src/exec/es/es_scan_reader.cpp +++ b/be/src/exec/es/es_scan_reader.cpp @@ -27,14 +27,15 @@ #include "exec/es/es_scroll_query.h" namespace doris { -const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields"; + +const std::string SOURCE_SCROLL_SEARCH_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id"; +const std::string DOCVALUE_SCROLL_SEARCH_FILTER_PATH = "filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields"; + const std::string REQUEST_SCROLL_PATH = "_scroll"; const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:"; const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; const std::string REQUEST_SEPARATOR = "/"; -const std::string REQUEST_SEARCH_FILTER_PATH = "filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields"; - ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode) : _scroll_keep_alive(config::es_scroll_keepalive), _http_timeout_ms(config::es_http_timeout_ms), @@ -57,6 +58,7 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string std::string batch_size_str = props.at(KEY_BATCH_SIZE); _batch_size = atoi(batch_size_str.c_str()); + std::string filter_path = _doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH; if (props.find(KEY_TERMINATE_AFTER) != props.end()) { _exactly_once = true; @@ -65,7 +67,7 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type << "/_search?" << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << REQUEST_PREFERENCE_PREFIX << _shards - << "&" << REUQEST_SCROLL_FILTER_PATH; + << "&" << filter_path; _search_url = scratch.str(); } else { _exactly_once = false; @@ -75,13 +77,12 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type << "/_search?" << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards - << "&" << REUQEST_SCROLL_FILTER_PATH + << "&" << filter_path << "&terminate_after=" << batch_size_str; _init_scroll_url = scratch.str(); - _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH; + _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path; } _eos = false; - } ESScanReader::~ESScanReader() { diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp index 1fa9a03..16f913d 100644 --- a/be/src/exec/es/es_scroll_parser.cpp +++ b/be/src/exec/es/es_scroll_parser.cpp @@ -230,6 +230,7 @@ Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once) const rapidjson::Value &outer_hits_node = _document_node[FIELD_HITS]; const rapidjson::Value &field_total = outer_hits_node[FIELD_TOTAL]; // after es 7.x "total": { "value": 1, "relation": "eq" } + // it is not necessary to parse `total`, this logic would be removed the another pr. if (field_total.IsObject()) { const rapidjson::Value &field_relation_value = field_total["relation"]; std::string relation = field_relation_value.GetString(); @@ -242,26 +243,16 @@ Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once) } else { _total = field_total.GetInt(); } - + // just used for the first scroll, maybe we should remove this logic from the `get_next` if (_total == 0) { return Status::OK(); } VLOG(1) << "es_scan_reader parse scroll result: " << scroll_result; - if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) { - // this is caused by query some columns which are not exit, e.g. - // A Index has fields: k1,k2,k3. and we put some rows into this Index (some fields dose NOT contain any data) - // e.g. - // put index/_doc/1 {"k2":"123"} - // put index/_doc/2 {"k3":"123} - // then we use sql `select k1 from table` - // what ES return is like this: {hits: {total:2} - return Status::OK(); - } const rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS]; + // this happened just the end of scrolling if (!inner_hits_node.IsArray()) { - LOG(WARNING) << "exception maybe happend on es cluster, reponse:" << scroll_result; - return Status::InternalError("inner hits node is not an array"); + return Status::OK(); } rapidjson::Document::AllocatorType& a = _document_node.GetAllocator(); @@ -288,30 +279,6 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, *line_eof = true; if (_size <= 0 || _line_index >= _size) { - // _source is fetched from ES - if (!_doc_value_mode) { - return Status::OK(); - } - - // _fields(doc_value) is fetched from ES - if (_total <= 0 || _line_index >= _total) { - return Status::OK(); - } - - - // here is operations for `enable_doc_value_scan`. - // This indicates that the fields does not exist(e.g. never assign values to these fields), but other fields have values. - // so, number of rows is >= 0, we need fill `NULL` to these fields that does not exist. - _line_index++; - tuple->init(tuple_desc->byte_size()); - for (int i = 0; i < tuple_desc->slots().size(); ++i) { - const SlotDescriptor* slot_desc = tuple_desc->slots()[i]; - if (slot_desc->is_materialized()) { - tuple->set_null(slot_desc->null_indicator_offset()); - } - } - - *line_eof = false; return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org