kangkaisen commented on a change in pull request #3742: URL: https://github.com/apache/incubator-doris/pull/3742#discussion_r433316888
########## File path: be/src/olap/push_handler.cpp ########## @@ -761,6 +915,126 @@ OLAPStatus LzoBinaryReader::_next_block() { return res; } +OLAPStatus PushBrokerReader::init(const Schema* schema, + const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl) { + // init schema + _schema = schema; + + // init runtime state, runtime profile, counter + TUniqueId dummy_id; + dummy_id.hi = 0; + dummy_id.lo = 0; + TPlanFragmentExecParams params; + params.fragment_instance_id = dummy_id; + params.query_id = dummy_id; + TExecPlanFragmentParams fragment_params; + fragment_params.params = params; + fragment_params.protocol_version = PaloInternalServiceVersion::V1; + TQueryOptions query_options; + TQueryGlobals query_globals; + _runtime_state.reset(new RuntimeState(fragment_params, query_options, query_globals, + ExecEnv::GetInstance())); + DescriptorTbl* desc_tbl = NULL; + Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to create descriptor table, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + _runtime_state->set_desc_tbl(desc_tbl); + status = _runtime_state->init_mem_trackers(dummy_id); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + _runtime_profile.reset(new RuntimeProfile(_runtime_state->obj_pool(), "PushBrokerReader")); + _mem_tracker.reset(new MemTracker(-1)); + _mem_pool.reset(new MemPool(_mem_tracker.get())); + _counter.reset(new ScannerCounter()); + + // init scanner + BaseScanner *scanner = nullptr; + switch (t_scan_range.ranges[0].format_type) { + case TFileFormatType::FORMAT_PARQUET: + scanner = new ParquetScanner(_runtime_state.get(), + _runtime_profile.get(), + t_scan_range.params, + t_scan_range.ranges, + t_scan_range.broker_addresses, + _counter.get()); + break; + default: + LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type; + return OLAP_ERR_PUSH_INIT_ERROR; + } + _scanner.reset(scanner); + status = _scanner->open(); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to open scanner, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + + // init tuple + auto tuple_id = t_scan_range.params.dest_tuple_id; + _tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id; + return OLAP_ERR_PUSH_INIT_ERROR; + } + + int tuple_buffer_size = _tuple_desc->byte_size(); + void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size); + if (tuple_buffer == nullptr) { + LOG(WARNING) << "Allocate memory for tuple failed"; + return OLAP_ERR_PUSH_INIT_ERROR; + } + _tuple = reinterpret_cast<Tuple*>(tuple_buffer); + + _ready = true; + return OLAP_SUCCESS; +} + +OLAPStatus PushBrokerReader::next(ContiguousRow* row) { + if (!_ready || row == nullptr) { + return OLAP_ERR_INPUT_PARAMETER_ERROR; + } + + memset(_tuple, 0, _tuple_desc->num_null_bytes()); + // Get from scanner + Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Scanner get next tuple failed"; + return OLAP_ERR_PUSH_INPUT_DATA_ERROR; + } + if (_eof) { + return OLAP_SUCCESS; + } + //LOG(INFO) << "row data: " << _tuple->to_string(*_tuple_desc); + + auto slot_descs = _tuple_desc->slots(); + size_t num_key_columns = _schema->num_key_columns(); + for (size_t i = 0; i < slot_descs.size(); ++i) { + auto cell = row->cell(i); + const SlotDescriptor* slot = slot_descs[i]; + bool is_null = _tuple->is_null(slot->null_indicator_offset()); + const void* value = _tuple->get_slot(slot->tuple_offset()); + _schema->column(i)->consume(&cell, (const char*)value, is_null, + _mem_pool.get(), _runtime_state->obj_pool()); + if (i >= num_key_columns) { + _schema->column(i)->agg_finalize(&cell, _mem_pool.get()); Review comment: Add a comment for these lines code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org