This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 3f31866 [Bug][Load][Json] #4124 Load json format with stream load failed (#4217) 3f31866 is described below commit 3f31866169dcd1a76c79026da42a384fda5c2c79 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Aug 4 12:55:53 2020 +0800 [Bug][Load][Json] #4124 Load json format with stream load failed (#4217) Stream load should read all the data completely before parsing the json. And also add a new BE config streaming_load_max_batch_read_mb to limit the data size when loading json data. Fix the bug of loading empty json array [] Add doc to explain some certain case of loading json format data. Fix: #4124 --- be/src/common/config.h | 7 +- be/src/exec/base_scanner.cpp | 2 +- be/src/exec/base_scanner.h | 2 +- be/src/exec/broker_scanner.cpp | 2 +- be/src/exec/json_scanner.cpp | 30 ++++-- be/src/exec/orc_scanner.cpp | 2 +- be/src/exec/parquet_scanner.cpp | 2 +- be/src/http/action/stream_load.cpp | 18 +++- be/src/runtime/stream_load/stream_load_pipe.h | 82 ++++++++++---- docs/en/administrator-guide/config/be_config.md | 16 +++ .../load-data/load-json-format.md | 120 ++++++++++++++++++++- docs/zh-CN/administrator-guide/config/be_config.md | 16 +++ .../load-data/load-json-format.md | 118 ++++++++++++++++++++ 13 files changed, 379 insertions(+), 38 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 31a62b7..1538638 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -303,11 +303,14 @@ namespace config { CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); - // Deprecated, use streaming_load_max_mb instead - // CONF_Int64(mini_load_max_mb, "2048"); CONF_Int32(number_tablet_writer_threads, "16"); + // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); + // Some data formats, such as JSON, cannot be streamed. + // Therefore, it is necessary to limit the maximum number of + // such data when using stream load to prevent excessive memory consumption. + CONF_mInt64(streaming_load_max_batch_size_mb, "100"); // the alive time of a TabletsChannel. // If the channel does not receive any data till this time, // the channel will be removed. diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 73e6d5b..8acc1c5 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -136,7 +136,7 @@ Status BaseScanner::init_expr_ctxes() { return Status::OK(); } -bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool) { +bool BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { int ctx_idx = 0; for (auto slot_desc : _dest_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index d233502..751a020 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -59,7 +59,7 @@ public: // Close this scanner virtual void close() = 0; - bool fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool); + bool fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); void fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path); diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index a854bde..6a2b23b 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -395,7 +395,7 @@ bool BrokerScanner::convert_one_row( if (!line_to_src_tuple(line)) { return false; } - return fill_dest_tuple(line, tuple, tuple_pool); + return fill_dest_tuple(tuple, tuple_pool); } // Convert one row to this tuple diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index ca6cd56..0c0c2dd 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -73,8 +73,8 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { - break;// break if true + if (fill_dest_tuple(tuple, tuple_pool)) { + break; // break if true } } if (_scanner_eof) { @@ -399,6 +399,15 @@ void JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator value // for simple format json void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid) { + if (!objectValue.IsObject()) { + // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, + // not other type of Json format. + _state->append_error_msg_to_file(_print_json_value(objectValue), "Expect json object value"); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + return; + } + int nullcount = 0; for (auto v : slot_descs) { if (objectValue.HasMember(v->col_name().c_str())) { @@ -443,20 +452,29 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, c Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof) { do { bool valid = false; - if (_next_line >= _total_lines) {//parse json and generic document + if (_next_line >= _total_lines) { // parse json and generic document Status st = _parse_json_doc(eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); // terminate if encounter other errors - if (*eof) {// read all data, then return + if (*eof) { // read all data, then return return Status::OK(); } if (_json_doc->IsArray()) { _total_lines = _json_doc->Size(); + if (_total_lines == 0) { + // may be passing an empty json, such as "[]" + std::stringstream str_error; + str_error << "Empty json line"; + _state->append_error_msg_to_file(_print_json_value(*_json_doc), str_error.str()); + _counter->num_rows_filtered++; + continue; + } } else { _total_lines = 1; // only one row } + _next_line = 0; } @@ -534,7 +552,7 @@ Status JsonReader::_handle_nested_complex_json(Tuple* tuple, const std::vector<S if (*eof) { return Status::OK();// read over,then return } - break; //read a valid row + break; // read a valid row } _write_values_by_jsonpath(*_json_doc, tuple_pool, tuple, slot_descs); return Status::OK(); @@ -558,7 +576,7 @@ Status JsonReader::_handle_flat_array_complex_json(Tuple* tuple, const std::vect continue; // continue to read next } RETURN_IF_ERROR(st); // terminate if encounter other errors - if (*eof) {// read all data, then return + if (*eof) { // read all data, then return return Status::OK(); } _total_lines = _json_doc->Size(); diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 7edf204..847b465 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -326,7 +326,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + if (fill_dest_tuple(tuple, tuple_pool)) { break; // get one line, break from while } // else skip this line and continue get_next to return } diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index cb22687..d2e69e9 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -80,7 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + if (fill_dest_tuple(tuple, tuple_pool)) { break;// break if true } } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 52e16a5..294ea16 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -218,7 +218,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct LOG(WARNING) << "body exceed max size." << ctx->brief(); std::stringstream ss; - ss << "body exceed max size, max_body_bytes=" << max_body_bytes; + ss << "body exceed max size: " << max_body_bytes << ", limit: " << max_body_bytes; return Status::InternalError(ss.str()); } } else { @@ -234,11 +234,20 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct } else { ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY)); if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { - LOG(WARNING) << "unknown data format." << ctx->brief(); std::stringstream ss; ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY); return Status::InternalError(ss.str()); } + + if (ctx->format == TFileFormatType::FORMAT_JSON) { + size_t max_body_bytes = config::streaming_load_max_batch_size_mb * 1024 * 1024; + if (ctx->body_bytes > max_body_bytes) { + std::stringstream ss; + ss << "The size of this batch exceed the max size [" << max_body_bytes + << "] of json type data " << " data [ " << ctx->body_bytes << " ]"; + return Status::InternalError(ss.str()); + } + } } if (!http_req->header(HTTP_TIMEOUT).empty()) { @@ -312,7 +321,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.formatType = ctx->format; request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared<StreamLoadPipe>(); + auto pipe = std::make_shared<StreamLoadPipe>( + 1024 * 1024 /* max_buffered_bytes */, + 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 2fc4a9b..3deb404 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -33,10 +33,12 @@ namespace doris { class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, - size_t min_chunk_size = 64 * 1024) + size_t min_chunk_size = 64 * 1024, + int64_t total_length = -1) : _buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), + _total_length(total_length), _finished(false), _cancelled(false) { } virtual ~StreamLoadPipe() { } @@ -84,31 +86,33 @@ public: return _append(buf); } + // If _total_length == -1, this should be a Kafka routine load task, + // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. + // Otherwise, this should be a stream load task that needs to read the specified amount of data. Status read_one_message(uint8_t** data, size_t* length) override { - std::unique_lock<std::mutex> l(_lock); - while (!_cancelled && !_finished && _buf_queue.empty()) { - _get_cond.wait(l); - } - // cancelled - if (_cancelled) { - return Status::InternalError("cancelled"); - } - // finished - if (_buf_queue.empty()) { - DCHECK(_finished); - *data = nullptr; + if (_total_length < -1) { + std::stringstream ss; + ss << "invalid, _total_length is: " << _total_length; + return Status::InternalError(ss.str()); + } else if (_total_length == 0) { + // no data *length = 0; return Status::OK(); } - auto buf = _buf_queue.front(); - *length = buf->remaining(); - *data = new uint8_t[*length]; - buf->get_bytes((char*)(*data) , *length); - _buf_queue.pop_front(); - _buffered_bytes -= buf->limit; - _put_cond.notify_one(); - return Status::OK(); + if (_total_length == -1) { + return _read_next_buffer(data, length); + } + + // _total_length > 0, read the entire data + *data = new uint8_t[_total_length]; + *length = _total_length; + bool eof = false; + Status st = read(*data, length, &eof); + if (eof) { + *length = 0; + } + return st; } Status read(uint8_t* data, size_t* data_size, bool* eof) override { @@ -196,6 +200,34 @@ public: } private: + // read the next buffer from _buf_queue + Status _read_next_buffer(uint8_t** data, size_t* length) { + std::unique_lock<std::mutex> l(_lock); + while (!_cancelled && !_finished && _buf_queue.empty()) { + _get_cond.wait(l); + } + // cancelled + if (_cancelled) { + return Status::InternalError("cancelled"); + } + // finished + if (_buf_queue.empty()) { + DCHECK(_finished); + *data = nullptr; + *length = 0; + return Status::OK(); + } + auto buf = _buf_queue.front(); + *length = buf->remaining(); + *data = new uint8_t[*length]; + buf->get_bytes((char*)(*data) , *length); + + _buf_queue.pop_front(); + _buffered_bytes -= buf->limit; + _put_cond.notify_one(); + return Status::OK(); + } + Status _append(const ByteBufferPtr& buf) { { std::unique_lock<std::mutex> l(_lock); @@ -221,6 +253,14 @@ private: size_t _buffered_bytes; size_t _max_buffered_bytes; size_t _min_chunk_size; + // The total amount of data expected to be read. + // In some scenarios, such as loading json format data through stream load, + // the data needs to be completely read before it can be parsed, + // so the total size of the data needs to be known. + // The default is -1, which means that the data arrives in a stream + // and the length is unknown. + // size_t is unsigned, so use int64_t + int64_t _total_length = -1; std::deque<ByteBufferPtr> _buf_queue; std::condition_variable _put_cond; std::condition_variable _get_cond; diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 196cd5e..bf6c0bf 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -437,6 +437,22 @@ Indicates how many tablets in this data directory failed to load. At the same ti ### `streaming_load_max_mb` +* Type: int64 +* Description: Used to limit the maximum amount of data allowed in one Stream load. The unit is MB. +* Default value: 10240 +* Dynamically modify: yes + +Stream Load is generally suitable for loading data less than a few GB, not suitable for loading` too large data. + +### `streaming_load_max_batch_size_mb` + +* Type: int64 +* Description: For some data formats, such as JSON, it is used to limit the maximum amount of data allowed in one Stream load. The unit is MB. +* Default value: 100 +* Dynamically modify: yes + +Some data formats, such as JSON, cannot be split. Doris must read all the data into the memory before parsing can begin. Therefore, this value is used to limit the maximum amount of data that can be loaded in a single Stream load. + ### `streaming_load_rpc_max_alive_time_sec` ### `sync_tablet_meta` diff --git a/docs/en/administrator-guide/load-data/load-json-format.md b/docs/en/administrator-guide/load-data/load-json-format.md index 14caff6..c57ec99 100644 --- a/docs/en/administrator-guide/load-data/load-json-format.md +++ b/docs/en/administrator-guide/load-data/load-json-format.md @@ -183,6 +183,124 @@ Doris supports extracting the data specified in Json through Json Path. Will result in a complete match failure, the line will be marked as an error row, instead of producing `null, null`. +## Json Path and Columns + +Json Path is used to specify how to extract data in JSON format, and Columns specify the mapping and conversion relationship of columns. The two can be used together, for example as follows. + +Data content: + +``` +{"k1": 1, "k2": 2} +``` + +Table schema: + +`k2 int, k1 int` + +Load statement 1 (take Stream Load as an example): + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -T example.json http:/ /127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +In Load statement 1, only Json Path is specified, and Columns are not specified. The role of Json Path is to extract the Json data in the order of the fields in the Json Path, and then write it in the order of the table schema. The final loaded data results are as follows: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 2 | 1 | ++------+------+ +``` + +You will see that the actual k1 column has loaded the value of the "k2" column in the Json data. This is because the field name in Json is not equivalent to the field name in the table schema. We need to explicitly specify the mapping relationship between the two. + +Load statement 2: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, k1 "-T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +Compared to load statement 1, here is the Columns field, which is used to describe the mapping relationship of columns, in the order of `k2, k1`. That is, after extracting in the order of the fields in the Json Path, specify the first column as the value of the k2 column in the table, and the second column as the value of the k1 column in the table. The final loaded data results are as follows: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | 2 | ++------+------+ +``` + +Of course, like other load methods, you can perform column conversion operations in Columns. Examples are as follows: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, tmp_k1 , k1 = tmp_k1 * 100" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +The above example will multiply the value of k1 by 100 and import it. The final imported data results are as follows: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 100 | 2 | ++------+------+ +``` + +## NULL and Default value + +The sample data is as follows: + +``` +[ + {"k1": 1, "k2": "a"}, + {"k1": 2}, + {"k1": 3, "k2": "c"}, +] +``` + +The table schema is: `k1 int null, k2 varchar(32) null default "x"` + +The load statement is as follows: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +The import results that users may expect are as follows, that is, for missing columns, fill in default values. + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | x | ++------+------+ +| 3 | c | ++------+------+ +``` + +But the actual load result is as follows, that is, for missing columns, NULL is added. + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | NULL | ++------+------+ +| 3 | c | ++------+------+ +``` + +This is because through the information in the load statement, Doris does not know that "the missing column is the k2 column in the table". +If you want to load the above data as expected, the load statement is as follows: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]"- H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2,'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` ## Examples @@ -305,4 +423,4 @@ code INT NULL Routine Load processes Json data the same as Stream Load. I will not repeat them here. -For the Kafka data source, the content of each Massage is treated as a complete Json data. If multiple rows of data expressed in Array format in a Massage are loaded, multiple rows will be loaded, and Kafka's offset will only increase by 1. If an Array format Json represents multiple rows of data, but because the Json format error causes the parsing Json to fail, the error row will only increase by 1 (because the parsing fails, in fact, Doris cannot determine how many rows of data it con [...] \ No newline at end of file +For the Kafka data source, the content of each Massage is treated as a complete Json data. If multiple rows of data expressed in Array format in a Massage are loaded, multiple rows will be loaded, and Kafka's offset will only increase by 1. If an Array format Json represents multiple rows of data, but because the Json format error causes the parsing Json to fail, the error row will only increase by 1 (because the parsing fails, in fact, Doris cannot determine how many rows of data it con [...] diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 658ce21..79712ed 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -436,6 +436,22 @@ load tablets from header failed, failed tablets size: xxx, path=xxx ### `streaming_load_max_mb` +* 类型:int64 +* 描述:用于限制一次 Stream load 导入中,允许的最大数据量。单位 MB。 +* 默认值: 10240 +* 可动态修改:是 + +Stream Load 一般适用于导入几个GB以内的数据,不适合导入过大的数据。 + +### `streaming_load_max_batch_size_mb` + +* 类型:int64 +* 描述:对于某些数据格式,如 JSON,用于限制一次 Stream load 导入中,允许的最大数据量。单位 MB。 +* 默认值: 100 +* 可动态修改:是 + +一些数据格式,如 JSON,无法进行拆分处理,必须读取全部数据到内存后才能开始解析,因此,这个值用于限制此类格式数据单次导入最大数据量。 + ### `streaming_load_rpc_max_alive_time_sec` ### `sync_tablet_meta` diff --git a/docs/zh-CN/administrator-guide/load-data/load-json-format.md b/docs/zh-CN/administrator-guide/load-data/load-json-format.md index 8780f77..5acc418 100644 --- a/docs/zh-CN/administrator-guide/load-data/load-json-format.md +++ b/docs/zh-CN/administrator-guide/load-data/load-json-format.md @@ -185,6 +185,124 @@ Doris 支持通过 Json Path 抽取 Json 中指定的数据。 则会导致完全匹配失败,则该行会标记为错误行,而不是产出 `null, null`。 +## Json Path 和 Columns + +Json Path 用于指定如何对 JSON 格式中的数据进行抽取,而 Columns 指定列的映射和转换关系。两者可以配合使用,举例如下。 + +数据内容: + +``` +{"k1" : 1, "k2": 2} +``` + +表结构: + +`k2 int, k1 int` + +导入语句1(以 Stream Load 为例): + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +导入语句1中,仅指定了 Json Path,没有指定 Columns。其中 Json Path 的作用是将 Json 数据按照 Json Path 中字段的顺序进行抽取,之后会按照表结构的顺序进行写入。最终导入的数据结果如下: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 2 | 1 | ++------+------+ +``` + +会看到,实际的 k1 列导入了 Json 数据中的 "k2" 列的值。这是因为,Json 中字段名称并不等同于表结构中字段的名称。我们需要显式的指定这两者之间的映射关系。 + +导入语句2: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, k1" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +相比如导入语句1,这里增加了 Columns 字段,用于描述列的映射关系,按 `k2, k1` 的顺序。即按Json Path 中字段的顺序抽取后,指定第一列为表中 k2 列的值,而第二列为表中 k1 列的值。最终导入的数据结果如下: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | 2 | ++------+------+ +``` + +当然,如其他导入一样,可以在 Columns 中进行列的转换操作。示例如下: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, tmp_k1, k1 = tmp_k1 * 100" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +上述示例会将 k1 的值乘以 100 后导入。最终导入的数据结果如下: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 100 | 2 | ++------+------+ +``` + +## NULL 和 Default 值 + +示例数据如下: + +``` +[ + {"k1": 1, "k2": "a"}, + {"k1": 2}, + {"k1": 3, "k2": "c"}, +] +``` + +表结构为:`k1 int null, k2 varchar(32) null default "x"` + +导入语句如下: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +用户可能期望的导入结果如下,即对于缺失的列,填写默认值。 + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | x | ++------+------+ +| 3 | c | ++------+------+ +``` + +但实际的导入结果如下,即对于缺失的列,补上了 NULL。 + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | NULL | ++------+------+ +| 3 | c | ++------+------+ +``` + +这是因为通过导入语句中的信息,Doris 并不知道 “缺失的列是表中的 k2 列”。 +如果要对以上数据按照期望结果导入,则导入语句如下: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]" -H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` ## 应用示例 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org