morningman commented on a change in pull request #3230: URL: https://github.com/apache/incubator-doris/pull/3230#discussion_r413883698
########## File path: be/src/exec/broker_reader.h ########## @@ -52,6 +52,7 @@ class BrokerReader : public FileReader { // Read virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; + virtual Status read(uint8_t** buf, size_t *length) override; Review comment: Better named `read_all` ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; + + // used to hold current StreamLoadPipe + std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + // Reader + JsonReader* _cur_file_reader; + int _next_range; + bool _cur_file_eof; // is read over? + bool _scanner_eof; +}; + + +class JsonDataInternal { +public: + JsonDataInternal(rapidjson::Value* v); + ~JsonDataInternal(); + rapidjson::Value::ConstValueIterator get_next(); + +private: + bool is_end(); + +private: + rapidjson::Value* _json_values; + rapidjson::Value::ConstValueIterator _iterator; +}; + +class JsonReader { +public: + JsonReader(RuntimeState* state, ScannerCounter* counter, SmallFileMgr *fileMgr, RuntimeProfile* profile, FileReader* file_reader, + std::string& jsonpath, std::string& jsonpath_file); + ~JsonReader(); + Status read(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof); + +private: + void init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file); + void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); + size_t get_data_by_jsonpath(); + int parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo); + Status parse_json_doc(bool *eof); + Status set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid, int *nullcount); + Status set_tuple_value_from_map(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid); + Status handle_simple_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof); + Status handle_complex_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof); + Status write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool); + void close(); + +private: + static constexpr char const *JSON_PATH = "jsonpath"; + static constexpr char const *DORIS_DATA = "doris_data"; Review comment: Better to change it to "RECORDS", to be compatible with MySQL ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"0", 1); + break; + case rapidjson::Type::kTrueType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"true", 4); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"1", 1); + break; + case rapidjson::Type::kNullType: + if (desc->is_nullable()) { + tuple->set_null(desc->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "Json value is null, but the column `" << desc->col_name() << "` is not nullable."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + break; + default: + std::stringstream str_error; + str_error << "Invalid JsonType " << value->GetType() << ", Column Name `" << desc->col_name() << "`."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + return Status::OK(); +} + +/** + * handle input a simple json + * For example: + * 1. {"doris_data": [{"colunm1":"value1", "colunm2":10}, {"colunm1":"value2", "colunm2":30}]} + * 2. {"colunm1":"value1", "colunm2":10} + */ +Status JsonReader::set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid, int *nullcount) { + for (auto v : slot_descs) { + if (objectValue.HasMember(v->col_name().c_str())) { + rapidjson::Value& value = objectValue[v->col_name().c_str()]; + RETURN_IF_ERROR(write_data_to_tuple(&value, v, tuple, tuple_pool)); + } else { + if (v->is_nullable()) { + tuple->set_null(v->null_indicator_offset()); + (*nullcount)++; + } else { + std::stringstream str_error; + str_error << "The column `" << v->col_name() << "` is not nullable, but it's not found in jsondata."; + LOG(WARNING) << str_error.str(); + _state->append_error_msg_to_file("", str_error.str()); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + break; + } + } + } + *valid = true; + return Status::OK(); +} + +Status JsonReader::handle_simple_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof) { + do { + bool valid = false; + int nullcount = 0; + if (_next_line >= _total_lines) {//parse json and generic document + RETURN_IF_ERROR(parse_json_doc(eof)); + if (*eof) {// read all data, then return + return Status::OK(); + } + if (_json_doc.HasMember(JsonReader::DORIS_DATA) && _json_doc[JsonReader::DORIS_DATA].IsArray() ) { + _total_lines = _json_doc[JsonReader::DORIS_DATA].Size(); + } else { + _total_lines = 1; // only one row + } + _next_line = 0; + } + + if (_json_doc.HasMember(JsonReader::DORIS_DATA) && _json_doc[JsonReader::DORIS_DATA].IsArray()) {//handle case 1 + rapidjson::Value& valueArray = _json_doc[JsonReader::DORIS_DATA]; + rapidjson::Value& objectValue = valueArray[_next_line];// json object + RETURN_IF_ERROR(set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, &valid, &nullcount)); + } else {// handle case 2 + RETURN_IF_ERROR(set_tuple_value(_json_doc, tuple, slot_descs, tuple_pool, &valid, &nullcount)); + } + _next_line++; + if (!valid || nullcount == slot_descs.size()) {// All fields is null, read next one Review comment: If `nullcount == slot_descs.size()` is true, here you skip this line, but not update the `_counter->num_rows_filtered`. ########## File path: be/src/exec/json_scanner.h ########## @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_JSON_SCANNER_H_ +#define BE_SRC_JSON_SCANNER_H_ + +#include <memory> +#include <vector> +#include <string> +#include <map> +#include <sstream> +#include <rapidjson/document.h> + +#include "exec/base_scanner.h" +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "util/slice.h" +#include "util/runtime_profile.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/small_file_mgr.h" + +namespace doris { +class Tuple; +class SlotDescriptor; +class RuntimeState; +class TupleDescriptor; +class MemTracker; +class JsonReader; + +class JsonScanner : public BaseScanner { +public: + JsonScanner( + RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter); + ~JsonScanner(); + + // Open this scanner, will initialize information need to + Status open() override; + + // Get next tuple + Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) override; + + // Close this scanner + void close() override; +private: + Status open_next_reader(); +private: + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + + std::string _jsonpath; + std::string _jsonpath_file; + // std::shared_ptr<rapidjson::Document> _jsonPathDoc; + + // used to hold current StreamLoadPipe + std::shared_ptr<StreamLoadPipe> _stream_load_pipe; + // Reader + JsonReader* _cur_file_reader; + int _next_range; + bool _cur_file_eof; // is read over? + bool _scanner_eof; +}; + + +class JsonDataInternal { +public: + JsonDataInternal(rapidjson::Value* v); + ~JsonDataInternal(); + rapidjson::Value::ConstValueIterator get_next(); + +private: + bool is_end(); + +private: + rapidjson::Value* _json_values; + rapidjson::Value::ConstValueIterator _iterator; +}; + +class JsonReader { +public: + JsonReader(RuntimeState* state, ScannerCounter* counter, SmallFileMgr *fileMgr, RuntimeProfile* profile, FileReader* file_reader, + std::string& jsonpath, std::string& jsonpath_file); + ~JsonReader(); + Status read(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof); + +private: + void init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file); + void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len); + size_t get_data_by_jsonpath(); + int parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo); + Status parse_json_doc(bool *eof); + Status set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid, int *nullcount); + Status set_tuple_value_from_map(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid); + Status handle_simple_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof); + Status handle_complex_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof); Review comment: ```suggestion Status handle_complex_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof); ``` There are many `slot_descs` in other places has same problem. ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"0", 1); + break; + case rapidjson::Type::kTrueType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"true", 4); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"1", 1); + break; + case rapidjson::Type::kNullType: + if (desc->is_nullable()) { + tuple->set_null(desc->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "Json value is null, but the column `" << desc->col_name() << "` is not nullable."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + break; + default: + std::stringstream str_error; + str_error << "Invalid JsonType " << value->GetType() << ", Column Name `" << desc->col_name() << "`."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + return Status::OK(); +} + +/** + * handle input a simple json + * For example: + * 1. {"doris_data": [{"colunm1":"value1", "colunm2":10}, {"colunm1":"value2", "colunm2":30}]} + * 2. {"colunm1":"value1", "colunm2":10} + */ +Status JsonReader::set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid, int *nullcount) { + for (auto v : slot_descs) { + if (objectValue.HasMember(v->col_name().c_str())) { + rapidjson::Value& value = objectValue[v->col_name().c_str()]; + RETURN_IF_ERROR(write_data_to_tuple(&value, v, tuple, tuple_pool)); + } else { + if (v->is_nullable()) { + tuple->set_null(v->null_indicator_offset()); + (*nullcount)++; + } else { + std::stringstream str_error; + str_error << "The column `" << v->col_name() << "` is not nullable, but it's not found in jsondata."; + LOG(WARNING) << str_error.str(); + _state->append_error_msg_to_file("", str_error.str()); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + break; + } + } + } + *valid = true; + return Status::OK(); +} + +Status JsonReader::handle_simple_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof) { + do { + bool valid = false; + int nullcount = 0; + if (_next_line >= _total_lines) {//parse json and generic document + RETURN_IF_ERROR(parse_json_doc(eof)); + if (*eof) {// read all data, then return + return Status::OK(); + } + if (_json_doc.HasMember(JsonReader::DORIS_DATA) && _json_doc[JsonReader::DORIS_DATA].IsArray() ) { + _total_lines = _json_doc[JsonReader::DORIS_DATA].Size(); + } else { + _total_lines = 1; // only one row + } + _next_line = 0; + } + + if (_json_doc.HasMember(JsonReader::DORIS_DATA) && _json_doc[JsonReader::DORIS_DATA].IsArray()) {//handle case 1 Review comment: What is `case 1`, And what is `case 2`? ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"0", 1); + break; + case rapidjson::Type::kTrueType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"true", 4); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"1", 1); + break; + case rapidjson::Type::kNullType: + if (desc->is_nullable()) { + tuple->set_null(desc->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "Json value is null, but the column `" << desc->col_name() << "` is not nullable."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + break; + default: + std::stringstream str_error; + str_error << "Invalid JsonType " << value->GetType() << ", Column Name `" << desc->col_name() << "`."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + return Status::OK(); +} + +/** + * handle input a simple json + * For example: + * 1. {"doris_data": [{"colunm1":"value1", "colunm2":10}, {"colunm1":"value2", "colunm2":30}]} + * 2. {"colunm1":"value1", "colunm2":10} + */ +Status JsonReader::set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid, int *nullcount) { + for (auto v : slot_descs) { + if (objectValue.HasMember(v->col_name().c_str())) { + rapidjson::Value& value = objectValue[v->col_name().c_str()]; + RETURN_IF_ERROR(write_data_to_tuple(&value, v, tuple, tuple_pool)); + } else { + if (v->is_nullable()) { + tuple->set_null(v->null_indicator_offset()); + (*nullcount)++; + } else { + std::stringstream str_error; + str_error << "The column `" << v->col_name() << "` is not nullable, but it's not found in jsondata."; + LOG(WARNING) << str_error.str(); + _state->append_error_msg_to_file("", str_error.str()); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + break; + } + } + } + *valid = true; + return Status::OK(); +} + +Status JsonReader::handle_simple_json(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool* eof) { + do { + bool valid = false; + int nullcount = 0; + if (_next_line >= _total_lines) {//parse json and generic document + RETURN_IF_ERROR(parse_json_doc(eof)); + if (*eof) {// read all data, then return + return Status::OK(); + } + if (_json_doc.HasMember(JsonReader::DORIS_DATA) && _json_doc[JsonReader::DORIS_DATA].IsArray() ) { + _total_lines = _json_doc[JsonReader::DORIS_DATA].Size(); + } else { + _total_lines = 1; // only one row + } + _next_line = 0; + } + + if (_json_doc.HasMember(JsonReader::DORIS_DATA) && _json_doc[JsonReader::DORIS_DATA].IsArray()) {//handle case 1 + rapidjson::Value& valueArray = _json_doc[JsonReader::DORIS_DATA]; + rapidjson::Value& objectValue = valueArray[_next_line];// json object + RETURN_IF_ERROR(set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, &valid, &nullcount)); + } else {// handle case 2 + RETURN_IF_ERROR(set_tuple_value(_json_doc, tuple, slot_descs, tuple_pool, &valid, &nullcount)); + } + _next_line++; + if (!valid || nullcount == slot_descs.size()) {// All fields is null, read next one + continue; + } + break; // get a valid row, then break + } while (_next_line <= _total_lines); + return Status::OK(); +} + +Status JsonReader::set_tuple_value_from_map(Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid) { + std::unordered_map<std::string, JsonDataInternal>::iterator it_map; + for (auto v : slot_descs) { + it_map = _jmap.find(v->col_name()); + if (it_map == _jmap.end()) { + return Status::RuntimeError("The column name of table is not foud in jsonpath."); + } + rapidjson::Value::ConstValueIterator value = it_map->second.get_next(); + if (value == nullptr) { + if (v->is_nullable()) { + tuple->set_null(v->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "The column `" << it_map->first << "` is not nullable, but it's not found in jsondata."; + LOG(WARNING) << str_error.str(); Review comment: Not print any log here, or it may print lots of logs ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"0", 1); + break; + case rapidjson::Type::kTrueType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"true", 4); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"1", 1); + break; + case rapidjson::Type::kNullType: + if (desc->is_nullable()) { + tuple->set_null(desc->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "Json value is null, but the column `" << desc->col_name() << "` is not nullable."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + break; + default: + std::stringstream str_error; + str_error << "Invalid JsonType " << value->GetType() << ", Column Name `" << desc->col_name() << "`."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + return Status::OK(); +} + +/** + * handle input a simple json + * For example: + * 1. {"doris_data": [{"colunm1":"value1", "colunm2":10}, {"colunm1":"value2", "colunm2":30}]} + * 2. {"colunm1":"value1", "colunm2":10} + */ +Status JsonReader::set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, std::vector<SlotDescriptor*> slot_descs, MemPool* tuple_pool, bool *valid, int *nullcount) { + for (auto v : slot_descs) { + if (objectValue.HasMember(v->col_name().c_str())) { + rapidjson::Value& value = objectValue[v->col_name().c_str()]; + RETURN_IF_ERROR(write_data_to_tuple(&value, v, tuple, tuple_pool)); + } else { + if (v->is_nullable()) { + tuple->set_null(v->null_indicator_offset()); + (*nullcount)++; + } else { + std::stringstream str_error; + str_error << "The column `" << v->col_name() << "` is not nullable, but it's not found in jsondata."; + LOG(WARNING) << str_error.str(); Review comment: Not print any log here ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); Review comment: Remove unused code ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"0", 1); + break; + case rapidjson::Type::kTrueType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"true", 4); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"1", 1); + break; + case rapidjson::Type::kNullType: + if (desc->is_nullable()) { + tuple->set_null(desc->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "Json value is null, but the column `" << desc->col_name() << "` is not nullable."; + LOG(WARNING) << str_error.str(); Review comment: Not print any log here ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; Review comment: ```suggestion const rapidjson::Value& arr_json_path = _jsonpath_doc[JsonReader::JSON_PATH]; ``` ########## File path: gensrc/thrift/BackendService.thrift ########## @@ -54,11 +55,12 @@ struct TRoutineLoadTask { 6: optional string db 7: optional string tbl 8: optional string label - 9: optional i64 max_interval_s - 10: optional i64 max_batch_rows - 11: optional i64 max_batch_size - 12: optional TKafkaLoadInfo kafka_load_info - 13: optional PaloInternalService.TExecPlanFragmentParams params + 9: optional PlanNodes.TFileFormatType format Review comment: Agree ########## File path: be/src/exec/json_scanner.cpp ########## @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "exec/json_scanner.h" +#include <algorithm> +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/raw_value.h" +#include "runtime/runtime_state.h" +#include "exprs/expr.h" +#include "env/env.h" +#include "exec/local_file_reader.h" +#include "exec/broker_reader.h" +#include "exprs/json_functions.h" + +namespace doris { + +JsonScanner::JsonScanner(RuntimeState* state, + RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, + ScannerCounter* counter) : BaseScanner(state, profile, params, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false), + _scanner_eof(false) { + +} + +JsonScanner::~JsonScanner() { + close(); +} + +Status JsonScanner::open() { + return BaseScanner::open(); +} + +Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { + SCOPED_TIMER(_read_timer); + // Get one line + while (!_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + continue; + } + _cur_file_eof = false; + } + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + + if (_cur_file_eof) { + continue; // read next file + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + break;// break if true + } + } + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status JsonScanner::open_next_reader() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TBrokerRangeDesc& range = _ranges[_next_range++]; + int64_t start_offset = range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + FileReader *file = NULL; + switch (range.file_type) { + case TFileType::FILE_LOCAL: { + LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); + RETURN_IF_ERROR(file_reader->open()); + file = file_reader; + break; + } + case TFileType::FILE_BROKER: { + BrokerReader* broker_reader = new BrokerReader( + _state->exec_env(), _broker_addresses, _params.properties, range.path, start_offset); + RETURN_IF_ERROR(broker_reader->open()); + file = broker_reader; + break; + } + + case TFileType::FILE_STREAM: { + _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); + if (_stream_load_pipe == nullptr) { + VLOG(3) << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + file = _stream_load_pipe.get(); + break; + } + default: { + std::stringstream ss; + ss << "Unknown file type, type=" << range.file_type; + return Status::InternalError(ss.str()); + } + } + + std::string jsonpath = ""; + std::string jsonpath_file = ""; + if (range.__isset.jsonpath) { + jsonpath = range.jsonpath; + } else if (range.__isset.jsonpath_file) { + jsonpath_file = range.jsonpath_file; + } + _cur_file_reader = new JsonReader(_state, _counter, _state->exec_env()->small_file_mgr(), _profile, file, jsonpath, jsonpath_file); + + return Status::OK(); +} + +void JsonScanner::close() { + if (_cur_file_reader != nullptr) { + delete _cur_file_reader; + _cur_file_reader = nullptr; + if (_stream_load_pipe != nullptr) { + _stream_load_pipe.reset(); + } + } +} + +////// class JsonDataInternal +JsonDataInternal::JsonDataInternal(rapidjson::Value* v) : + _json_values(v), _iterator(v->Begin()) { +} + +JsonDataInternal::~JsonDataInternal() { + +} +bool JsonDataInternal::is_end() { + return _json_values->End() == _iterator; +} + +rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { + if (is_end()) { + return nullptr; + } + return _iterator++; +} + + +////// class JsonReader +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + SmallFileMgr *fileMgr, + RuntimeProfile* profile, + FileReader* file_reader, + std::string& jsonpath, + std::string& jsonpath_file) : + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + + init_jsonpath(fileMgr, jsonpath, jsonpath_file); +} + +JsonReader::~JsonReader() { + close(); +} + +void JsonReader::init_jsonpath(SmallFileMgr *fileMgr, std::string& jsonpath, std::string& jsonpath_file) { + //parse jsonpath + if (!jsonpath.empty()) { + if (!_jsonpath_doc.Parse(jsonpath.c_str()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + _parse_jsonpath_flag = -1;// failed, has none object + } else { + _parse_jsonpath_flag = 1;// success + } + } else { + _parse_jsonpath_flag = -1;// parse failed + } + } else if (!jsonpath_file.empty()) { + //Read jsonpath from file, has format: file_id:md5 + _parse_jsonpath_flag = parse_jsonpath_from_file(fileMgr, jsonpath_file); + } else { + _parse_jsonpath_flag = 0; + } + return ; +} + +void JsonReader::close() { + if (_closed) { + return; + } + if (typeid(*_file_reader) == typeid(doris::BrokerReader) || typeid(*_file_reader) == typeid(doris::LocalFileReader)) { + _file_reader->close(); + delete _file_reader; + } + _closed = true; +} + +int JsonReader::parse_jsonpath_from_file(SmallFileMgr *smallFileMgr, std::string& fileinfo ) { + std::vector<std::string> parts = strings::Split(fileinfo, ":", strings::SkipWhitespace()); + if (parts.size() != 2) { + LOG(WARNING)<< "parse_jsonpath_from_file Invalid fileinfo: " << fileinfo; + return -1; + } + int64_t file_id = std::stol(parts[0]); + std::string file_path; + Status st = smallFileMgr->get_file(file_id, parts[1], &file_path); + if (!st.ok()) { + return -1; + } + std::unique_ptr<RandomAccessFile> jsonPathFile; + st = Env::Default()->new_random_access_file(file_path, &jsonPathFile); + if (!st.ok()) { + return -1; + } + uint64_t size = 0; + jsonPathFile->size(&size); + if (size == 0) { + return 0; + } + boost::scoped_array<char> pBuf(new char[size]); + Slice slice(pBuf.get(), size); + st = jsonPathFile->read_at(0, slice); + if (!st.ok()) { + return -1; + } + + if (!_jsonpath_doc.Parse(slice.get_data()).HasParseError()) { + if (!_jsonpath_doc.HasMember(JsonReader::JSON_PATH) || !_jsonpath_doc[JsonReader::JSON_PATH].IsArray()) { + return -1;//failed, has none object + } else { + return 1;// success + } + } else { + return -1;// parse failed + } +} + +Status JsonReader::parse_json_doc(bool *eof) { + // read all, must be delete json_str + uint8_t* json_str = nullptr; + size_t length = 0; + RETURN_IF_ERROR(_file_reader->read(&json_str, &length)); + if (length == 0) { + *eof = true; + return Status::OK(); + } + // parse jsondata to JsonDoc + if (_json_doc.Parse((char*)json_str, length).HasParseError()) { + delete[] json_str; + return Status::InternalError("Parse json data for JsonDoc is failed."); + } + delete[] json_str; + return Status::OK(); +} + +size_t JsonReader::get_data_by_jsonpath() { + size_t max_lines = 0; + //iterator jsonpath to find object and save it to Map + _jmap.clear(); + const rapidjson::Value& arrJsonPath = _jsonpath_doc[JsonReader::JSON_PATH]; + for (int i = 0; i < arrJsonPath.Size(); i++) { + const rapidjson::Value& info = arrJsonPath[i]; + if (!info.IsObject() || !info.HasMember("column") || !info.HasMember("value") || + !info["column"].IsString() || !info["value"].IsString()) { + return -1; + } + + std::string column = info["column"].GetString(); + std::string value = info["value"].GetString(); + // if jsonValues is null, because not match in jsondata. + rapidjson::Value* json_values = JsonFunctions::get_json_object_from_parsed_json(value, &_json_doc); + if (json_values == NULL) { + return -1; + } + if (json_values->IsArray()) { + max_lines = std::max(max_lines, (size_t)json_values->Size()); + } else { + max_lines = std::max(max_lines, (size_t)1); + } + _jmap.emplace(column, json_values); + } + return max_lines; +} + +void JsonReader::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, int32_t len) { + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + str_slot->ptr = reinterpret_cast<char*>(mem_pool->allocate(len)); + memcpy(str_slot->ptr, value, len); + str_slot->len = len; + return; +} + +Status JsonReader::write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool) { + const char *str_value = NULL; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)str_value, strlen(str_value)); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char*)tmp_buf, "%u", value->GetUint()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt()) { + wbytes = sprintf((char*)tmp_buf, "%d", value->GetInt()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsUint64()) { + wbytes = sprintf((char*)tmp_buf, "%lu", value->GetUint64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else if (value->IsInt64()) { + wbytes = sprintf((char*)tmp_buf, "%ld", value->GetInt64()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } else { + wbytes = sprintf((char*)tmp_buf, "%f", value->GetDouble()); + fill_slot(tuple, desc, tuple_pool, tmp_buf, wbytes); + } + break; + case rapidjson::Type::kFalseType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"false", 5); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"0", 1); + break; + case rapidjson::Type::kTrueType: + //fill_slot(tuple, desc, tuple_pool, (uint8_t*)"true", 4); + fill_slot(tuple, desc, tuple_pool, (uint8_t*)"1", 1); + break; + case rapidjson::Type::kNullType: + if (desc->is_nullable()) { + tuple->set_null(desc->null_indicator_offset()); + } else { + std::stringstream str_error; + str_error << "Json value is null, but the column `" << desc->col_name() << "` is not nullable."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); + } + break; + default: + std::stringstream str_error; + str_error << "Invalid JsonType " << value->GetType() << ", Column Name `" << desc->col_name() << "`."; + LOG(WARNING) << str_error.str(); Review comment: Not print any log here ########## File path: fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java ########## @@ -270,8 +281,20 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (stmt.getMaxBatchSize() != -1) { this.maxBatchSizeBytes = stmt.getMaxBatchSize(); } + if (stmt.getFormat().equals("json")) { + this.format = "json"; + if (stmt.getJsonPath() != null && !stmt.getJsonPath().isEmpty()) { Review comment: ```suggestion if (!Strings.isNullOrEmpty(stmt.getJsonPath())) { ``` ########## File path: fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java ########## @@ -328,6 +357,19 @@ private void checkJobProperties() throws UserException { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); } timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE, timezone)); + + format = jobProperties.get(FORMAT); + if (format != null) { + if (!format.equalsIgnoreCase("json")) { + format = "";// if it's not json, then it's mean csv and set empty Review comment: I think we can just throw an exception here, means user specified a format which we don't support. ########## File path: fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java ########## @@ -175,6 +175,17 @@ public boolean isFinalState() { protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS; protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; + /** + * RoutineLoad support json data. + * Require Params: + * 1) format = "json" + * 2) jsonPathFile = "/XXX/xx/jsonpath.json" or jsonPath = "$.XXX.xxx" Review comment: ```suggestion * 2) jsonPathFile = "FILE:jsonpath.json" or jsonPath = "$.XXX.xxx" ``` ########## File path: fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java ########## @@ -270,8 +281,20 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (stmt.getMaxBatchSize() != -1) { this.maxBatchSizeBytes = stmt.getMaxBatchSize(); } + if (stmt.getFormat().equals("json")) { + this.format = "json"; + if (stmt.getJsonPath() != null && !stmt.getJsonPath().isEmpty()) { + this.jsonPath = stmt.getJsonPath(); + } else if (stmt.getJsonPathFile() != null && !stmt.getJsonPathFile().isEmpty()) { Review comment: ```suggestion } else if (!Strings.isNullOrEmpty(stmt.getJsonPathFile()) { ``` ########## File path: docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md ########## @@ -120,7 +120,8 @@ under the License. 9. 导入含有BITMAP列的表,可以是表中的列或者数据中的列用于生成BITMAP列,也可以使用bitmap_empty填充空的Bitmap curl --location-trusted -u root -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/testDb/testTbl/_stream_load - + 10. 导入json数据格式 Review comment: And a example about how to specify the `jsonpath_file` in stream load. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org