This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new d9c128b [BrokerLoad] Support read properties for broker load when read data (#5845) d9c128b is described below commit d9c128b7444f6066f0095894f39f8c70f8355419 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Wed Jun 9 14:59:55 2021 +0800 [BrokerLoad] Support read properties for broker load when read data (#5845) * [BrokerLoad] support read properties for broker load when read data Co-authored-by: caiconghui <caicong...@xiaomi.com> --- be/src/exec/broker_reader.cpp | 2 +- .../Data Manipulation/BROKER LOAD.md | 42 +++++++++++++++++ .../Data Manipulation/BROKER LOAD.md | 41 ++++++++++++++++ fe/fe-core/src/main/cup/sql_parser.cup | 6 ++- .../org/apache/doris/analysis/DataDescription.java | 54 +++++++++++++++++++--- .../java/org/apache/doris/analysis/LoadStmt.java | 1 + .../org/apache/doris/load/BrokerFileGroup.java | 13 +++++- .../org/apache/doris/planner/BrokerScanNode.java | 3 ++ .../java/org/apache/doris/qe/MultiLoadMgr.java | 2 +- .../apache/doris/analysis/DataDescriptionTest.java | 32 +++++++++++-- 10 files changed, 179 insertions(+), 17 deletions(-) diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index a720704..65d601a 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -123,7 +123,7 @@ Status BrokerReader::open() { //not support Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) { - return Status::NotSupported("Not support"); + return Status::NotSupported("broker reader doesn't support read_one_message interface"); } Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) { diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index bb05272..90a7fde 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -73,6 +73,7 @@ under the License. [SET (k1 = func(k2))] [WHERE predicate] [DELETE ON label=true] + [read_properties] Explain: file_path: @@ -132,6 +133,35 @@ under the License. delete_on_predicates: Only used when merge type is MERGE + + read_properties: + + Used to specify some special parameters. + Syntax: + [PROPERTIES ("key"="value", ...)] + + You can specify the following parameters: + + line_delimiter: Used to specify the line delimiter in the load file. The default is `\n`. You can use a combination of multiple characters as the column separator. + + fuzzy_parse: Boolean type, true to indicate that parse json schema as the first line, this can make import more faster,but need all key keep the order of first line, default value is false. Only use for json format. + + jsonpaths: There are two ways to import json: simple mode and matched mode. + simple mode: it is simple mode without setting the jsonpaths parameter. In this mode, the json data is required to be the object type. For example: + {"k1": 1, "k2": 2, "k3": "hello"}, where k1, k2, k3 are column names. + + matched mode: the json data is relatively complex, and the corresponding value needs to be matched through the jsonpaths parameter. + + strip_outer_array: Boolean type, true to indicate that json data starts with an array object and flattens objects in the array object, default value is false. For example: + [ + {"k1" : 1, "v1" : 2}, + {"k1" : 3, "v1" : 4} + ] + if strip_outer_array is true, and two rows of data are generated when imported into Doris. + + json_root: json_root is a valid JSONPATH string that specifies the root node of the JSON Document. The default value is "". + + num_as_string: Boolean type, true means that when parsing the json data, it will be converted into a number type and converted into a string, and then it will be imported without loss of precision. 3. broker_name @@ -487,6 +517,18 @@ under the License. WHERE k1 > 3 ) with BROKER "hdfs" ("username"="user", "password"="pass"); + + 15. Import the data in the json file, and specify format as json, it is judged by the file suffix by default, set parameters for reading data + + LOAD LABEL example_db.label9 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file") + INTO TABLE `my_table` + FORMAT AS "json" + (k1, k2, k3) + properties("fuzzy_parse"="true", "strip_outer_array"="true") + ) + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); ## keyword diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 56dfeb5..18240f6 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -73,6 +73,7 @@ under the License. [WHERE predicate] [DELETE ON label=true] [ORDER BY source_sequence] + [read_properties] 说明: file_path: @@ -132,6 +133,34 @@ under the License. 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 + read_properties: + + 用于指定一些特殊参数。 + 语法: + [PROPERTIES ("key"="value", ...)] + + 可以指定如下参数: + + line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。 + + fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。 + + jsonpaths: 导入json方式分为:简单模式和匹配模式。 + 简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如: + {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。 + 匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。 + + strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如: + [ + {"k1" : 1, "v1" : 2}, + {"k1" : 3, "v1" : 4} + ] + 当strip_outer_array为true,最后导入到doris中会生成两行数据。 + + json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + + num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。 + 3. broker_name 所使用的 broker 名称,可以通过 show broker 命令查看。 @@ -506,6 +535,18 @@ under the License. ) with BROKER "hdfs" ("username"="user", "password"="pass"); + 15. 导入json文件中数据 指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数 + + LOAD LABEL example_db.label9 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file") + INTO TABLE `my_table` + FORMAT AS "json" + (k1, k2, k3) + properties("fuzzy_parse"="true", "strip_outer_array"="true") + ) + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + ## keyword BROKER,LOAD diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index c5c19f6..237e703 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1415,9 +1415,10 @@ data_desc ::= where_clause:whereExpr delete_on_clause:deleteExpr sequence_col_clause:sequenceColName + opt_properties:properties {: RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, - columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName); + columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties); :} | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName opt_negative:isNeg @@ -1426,9 +1427,10 @@ data_desc ::= opt_col_mapping_list:colMappingList where_clause:whereExpr delete_on_clause:deleteExpr + opt_properties:properties {: RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr, - mergeType, deleteExpr); + mergeType, deleteExpr, properties); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index abee1d1..9f52275 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -120,6 +120,7 @@ public class DataDescription { private String jsonRoot = ""; private boolean fuzzyParse = false; private boolean readJsonByLine = false; + private boolean numAsString = false; private String sequenceCol; @@ -137,6 +138,7 @@ public class DataDescription { private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; private final Expr deleteCondition; + private Map<String, String> properties; public DataDescription(String tableName, PartitionNames partitionNames, @@ -147,7 +149,7 @@ public class DataDescription { boolean isNegative, List<Expr> columnMappingList) { this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, - isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null); + isNegative, columnMappingList, null, null, LoadTask.MergeType.APPEND, null, null, null); } public DataDescription(String tableName, @@ -163,7 +165,8 @@ public class DataDescription { Expr whereExpr, LoadTask.MergeType mergeType, Expr deleteCondition, - String sequenceColName) { + String sequenceColName, + Map<String, String> properties) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; @@ -179,6 +182,7 @@ public class DataDescription { this.mergeType = mergeType; this.deleteCondition = deleteCondition; this.sequenceCol = sequenceColName; + this.properties = properties; } // data from table external_hive_table @@ -189,7 +193,8 @@ public class DataDescription { List<Expr> columnMappingList, Expr whereExpr, LoadTask.MergeType mergeType, - Expr deleteCondition) { + Expr deleteCondition, + Map<String, String> properties) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = null; @@ -204,6 +209,7 @@ public class DataDescription { this.srcTableName = srcTableName; this.mergeType = mergeType; this.deleteCondition = deleteCondition; + this.properties = properties; } public static void validateMappingFunction(String functionName, List<String> args, @@ -499,12 +505,12 @@ public class DataDescription { this.fuzzyParse = fuzzyParse; } - public boolean isReadJsonByLine() { - return readJsonByLine; + public boolean isNumAsString() { + return numAsString; } - public void setReadJsonByLine(boolean readJsonByLine) { - this.readJsonByLine = readJsonByLine; + public void setNumAsString(boolean numAsString) { + this.numAsString = numAsString; } public String getJsonPaths() { @@ -755,6 +761,36 @@ public class DataDescription { } } + private void analyzeProperties() throws AnalysisException { + Map<String, String> analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + analysisMap.putAll(properties); + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) { + lineDelimiter = new Separator(analysisMap.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)); + lineDelimiter.analyze(); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)) { + fuzzyParse = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE)); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)) { + stripOuterArray = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY)); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONPATHS)) { + jsonPaths = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONPATHS); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_JSONROOT)) { + jsonRoot = analysisMap.get(LoadStmt.KEY_IN_PARAM_JSONROOT); + } + + if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)) { + numAsString = Boolean.parseBoolean(analysisMap.get(LoadStmt.KEY_IN_PARAM_NUM_AS_STRING)); + } + } + private void checkLoadPriv(String fullDbName) throws AnalysisException { if (Strings.isNullOrEmpty(tableName)) { throw new AnalysisException("No table name in load statement."); @@ -817,6 +853,10 @@ public class DataDescription { analyzeColumns(); analyzeMultiLoadColumns(); analyzeSequenceCol(fullDbName); + + if (properties != null) { + analyzeProperties(); + } } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 2915618..efcfe5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -108,6 +108,7 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_JSONROOT = "json_root"; public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array"; public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse"; + public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string"; public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type"; public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete"; public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 8c177c8..bc03e6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -103,6 +103,7 @@ public class BrokerFileGroup implements Writable { private String jsonRoot = ""; private boolean fuzzyParse = true; private boolean readJsonByLine = false; + private boolean numAsString = false; // for unit test and edit log persistence private BrokerFileGroup() { @@ -237,7 +238,9 @@ public class BrokerFileGroup implements Writable { jsonPaths = dataDescription.getJsonPaths(); jsonRoot = dataDescription.getJsonRoot(); fuzzyParse = dataDescription.isFuzzyParse(); - readJsonByLine = dataDescription.isReadJsonByLine(); + // For broker load, we only support reading json format data line by line, so we set readJsonByLine to true here. + readJsonByLine = true; + numAsString = dataDescription.isNumAsString(); } } @@ -357,6 +360,14 @@ public class BrokerFileGroup implements Writable { this.readJsonByLine = readJsonByLine; } + public boolean isNumAsString() { + return numAsString; + } + + public void setNumAsString(boolean numAsString) { + this.numAsString = numAsString; + } + public String getJsonPaths() { return jsonPaths; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 23c68fa..e3df23e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -445,6 +445,7 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse()); + rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } brokerScanRange(curLocations).addToRanges(rangeDesc); @@ -471,6 +472,8 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse()); + rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); + rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 8bde1ed..bd78e0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -496,7 +496,7 @@ public class MultiLoadMgr { } DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, fileFormat, null, isNegative, null, null, whereExpr, mergeType, deleteCondition, - sequenceColName); + sequenceColName, null); dataDescription.setColumnDef(colString); backend = Catalog.getCurrentSystemInfo().getBackend(backendId); if (backend == null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index 03f21cc..85395eb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -30,12 +30,14 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; import mockit.Expectations; import mockit.Injectable; @@ -119,7 +121,7 @@ public class DataDescriptionTest { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); + Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null, null); desc.analyze("testDb"); Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString()); Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql()); @@ -202,10 +204,30 @@ public class DataDescriptionTest { new FunctionCallExpr("bitmap_dict", params)); desc = new DataDescription("testTable", new PartitionNames(false, Lists.newArrayList("p1", "p2")), "testHiveTable", false, Lists.newArrayList(predicate), - null, LoadTask.MergeType.APPEND, null); + null, LoadTask.MergeType.APPEND, null, null); desc.analyze("testDb"); sql = "APPEND DATA FROM TABLE testHiveTable INTO TABLE testTable PARTITIONS (p1, p2) SET (`k1` = bitmap_dict(`k2`))"; Assert.assertEquals(sql, desc.toSql()); + + Map<String, String> properties = Maps.newHashMap(); + properties.put("line_delimiter", "abc"); + properties.put("fuzzy_parse", "true"); + properties.put("strip_outer_array", "true"); + properties.put("jsonpaths", "[\"$.h1.h2.k1\",\"$.h1.h2.v1\",\"$.h1.h2.v2\"]"); + properties.put("json_root", "$.RECORDS"); + properties.put("read_json_by_line", "true"); + properties.put("num_as_string","true"); + desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + Lists.newArrayList("col1", "col2"), new Separator(","), "json", null, false, null, + null, null, LoadTask.MergeType.APPEND, null, null, properties); + + desc.analyze("testDb"); + Assert.assertEquals("abc", desc.getLineDelimiter()); + Assert.assertTrue(desc.isFuzzyParse()); + Assert.assertTrue(desc.isStripOuterArray()); + Assert.assertEquals("[\"$.h1.h2.k1\",\"$.h1.h2.v1\",\"$.h1.h2.v2\"]", desc.getJsonPaths()); + Assert.assertEquals("$.RECORDS", desc.getJsonRoot()); + Assert.assertTrue(desc.isNumAsString()); } @Test(expected = AnalysisException.class) @@ -220,7 +242,7 @@ public class DataDescriptionTest { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); + Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null, null); desc.analyze("testDb"); } @@ -312,7 +334,7 @@ public class DataDescriptionTest { public void testAnalyzeSequenceColumnNormal() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new Separator("\t"), - null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); + null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence", null); new Expectations() { { tbl.getName(); @@ -331,7 +353,7 @@ public class DataDescriptionTest { public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("k1", "k2", "v1"), new Separator("\t"), - null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); + null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence", null); new Expectations() { { tbl.getName(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org