This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 681155c68a [feature](load) stream load trim double quotes for csv (#15241) 681155c68a is described below commit 681155c68a92c705792e0f1bf1bf4a99af863255 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Mon Dec 26 11:45:54 2022 +0800 [feature](load) stream load trim double quotes for csv (#15241) --- be/src/http/action/stream_load.cpp | 7 +++ be/src/http/http_common.h | 1 + be/src/vec/exec/format/csv/csv_reader.cpp | 14 +++++ be/src/vec/exec/format/csv/csv_reader.h | 1 + .../Load/STREAM-LOAD.md | 4 +- .../Load/STREAM-LOAD.md | 4 +- .../org/apache/doris/analysis/DataDescription.java | 6 ++ .../org/apache/doris/load/BrokerFileGroup.java | 6 ++ .../apache/doris/planner/StreamLoadScanNode.java | 1 + .../doris/planner/external/LoadScanProvider.java | 1 + .../java/org/apache/doris/task/LoadTaskInfo.java | 4 ++ .../java/org/apache/doris/task/StreamLoadTask.java | 9 +++ gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 4 ++ .../load_p0/stream_load/csv_with_double_quotes.csv | 8 +++ .../stream_load/test_csv_with_double_quotes.out | 21 +++++++ .../stream_load/test_csv_with_double_quotes.groovy | 64 ++++++++++++++++++++++ 17 files changed, 154 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index b2acc0cedb..7a1b9636ec 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -565,6 +565,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS)); } + if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) { + if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) { + request.__set_trim_double_quotes(true); + } else { + request.__set_trim_double_quotes(false); + } + } #ifndef BE_TEST // plan this load diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 1ae254b062..e61c828fb4 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -51,6 +51,7 @@ static const std::string HTTP_COMPRESS_TYPE = "compress_type"; static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns"; +static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 48f8d84dc7..1eaef7ad87 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -124,6 +124,10 @@ Status CsvReader::init_reader(bool is_load) { _line_delimiter = _params.file_attributes.text_params.line_delimiter; _line_delimiter_length = _line_delimiter.size(); + if (_params.file_attributes.__isset.trim_double_quotes) { + _trim_double_quotes = _params.file_attributes.trim_double_quotes; + } + // create decompressor. // _decompressor may be nullptr if this is not a compressed file RETURN_IF_ERROR(_create_decompressor()); @@ -412,6 +416,11 @@ void CsvReader::_split_line(const Slice& line) { non_space--; } } + if (_trim_double_quotes && (non_space - 1) > start && + *(value + start) == '\"' && *(value + non_space - 1) == '\"') { + start++; + non_space--; + } _split_values.emplace_back(value + start, non_space - start); start = curpos + _value_separator_length; curpos = start; @@ -428,6 +437,11 @@ void CsvReader::_split_line(const Slice& line) { non_space--; } } + if (_trim_double_quotes && (non_space - 1) > start && *(value + start) == '\"' && + *(value + non_space - 1) == '\"') { + start++; + non_space--; + } _split_values.emplace_back(value + start, non_space - start); } } diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 5bb14523e3..972c87da3c 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -107,6 +107,7 @@ private: std::string _line_delimiter; int _value_separator_length; int _line_delimiter_length; + bool _trim_double_quotes = false; // save source text which have been splitted. std::vector<Slice> _split_values; diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 560cc80ea3..fcb2b92b31 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -177,10 +177,12 @@ ERRORS: where url is the url given by ErrorURL. -23: compress_type +24. compress_type Specify compress type file. Only support compressed csv file now. Support gz, lzo, bz2, lz4, lzop, deflate. +25. trim_double_quotes: Boolean type, The default value is false. True means that the outermost double quotes of each field in the csv file are trimmed. + ### Example 1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md index 97473b3d1d..5dadc7f0e7 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md @@ -174,10 +174,12 @@ ERRORS: 其中 url 为 ErrorURL 给出的 url。 -23: compress_type +24. compress_type 指定文件的压缩格式。目前只支持 csv 文件的压缩。支持 gz, lzo, bz2, lz4, lzop, deflate 压缩格式。 +25. trim_double_quotes: 布尔类型,默认值为 false,为 true 时表示裁剪掉 csv 文件每个字段最外层的双引号。 + ### Example 1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index b67574e743..72b494a89d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -145,6 +145,7 @@ public class DataDescription { private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; private final Expr deleteCondition; private final Map<String, String> properties; + private boolean trimDoubleQuotes = false; public DataDescription(String tableName, PartitionNames partitionNames, @@ -250,6 +251,7 @@ public class DataDescription { this.readJsonByLine = taskInfo.isReadJsonByLine(); this.numAsString = taskInfo.isNumAsString(); this.properties = Maps.newHashMap(); + this.trimDoubleQuotes = taskInfo.getTrimDoubleQuotes(); } private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) { @@ -641,6 +643,10 @@ public class DataDescription { return readJsonByLine; } + public boolean getTrimDoubleQuotes() { + return trimDoubleQuotes; + } + /* * Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList * Example: diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 497f0c2d23..887062bda1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -105,6 +105,7 @@ public class BrokerFileGroup implements Writable { private boolean fuzzyParse = true; private boolean readJsonByLine = false; private boolean numAsString = false; + private boolean trimDoubleQuotes = false; // for unit test and edit log persistence private BrokerFileGroup() { @@ -262,6 +263,7 @@ public class BrokerFileGroup implements Writable { readJsonByLine = dataDescription.isReadJsonByLine(); numAsString = dataDescription.isNumAsString(); } + trimDoubleQuotes = dataDescription.getTrimDoubleQuotes(); } public long getTableId() { @@ -416,6 +418,10 @@ public class BrokerFileGroup implements Writable { return fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("orc"); } + public boolean getTrimDoubleQuotes() { + return trimDoubleQuotes; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 96e8410aef..621c7a9306 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -179,6 +179,7 @@ public class StreamLoadScanNode extends LoadScanNode { params.setLineDelimiter((byte) '\n'); params.setLineDelimiterLength(1); } + params.setTrimDoubleQuotes(taskInfo.getTrimDoubleQuotes()); params.setDestTupleId(desc.getId().asInt()); brokerScanRange.setParams(params); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 8f7ea2bb6e..ae2bf5d4ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -131,6 +131,7 @@ public class LoadScanProvider implements FileScanProviderIf { fileAttributes.setReadJsonByLine(fileGroup.isReadJsonByLine()); fileAttributes.setReadByColumnDef(true); fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat())); + fileAttributes.setTrimDoubleQuotes(fileGroup.getTrimDoubleQuotes()); } private String getHeaderType(String formatType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 642597246f..e384394653 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -97,6 +97,10 @@ public interface LoadTaskInfo { List<String> getHiddenColumns(); + default boolean getTrimDoubleQuotes() { + return false; + } + class ImportColumnDescs { public List<ImportColumnDesc> descs = Lists.newArrayList(); public boolean isColumnDescsRewrited = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 59393b7d01..2a80ceef90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -82,6 +82,7 @@ public class StreamLoadTask implements LoadTaskInfo { private boolean loadToSingleTablet = false; private String headerType = ""; private List<String> hiddenColumns; + private boolean trimDoubleQuotes = false; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, TFileCompressType compressType) { @@ -251,6 +252,11 @@ public class StreamLoadTask implements LoadTaskInfo { return hiddenColumns; } + @Override + public boolean getTrimDoubleQuotes() { + return trimDoubleQuotes; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType(), @@ -350,6 +356,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetHiddenColumns()) { hiddenColumns = Arrays.asList(request.getHiddenColumns().replaceAll("\\s+", "").split(",")); } + if (request.isSetTrimDoubleQuotes()) { + trimDoubleQuotes = request.isTrimDoubleQuotes(); + } } // used for stream load diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3036c1d53f..4c7d062727 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -546,6 +546,7 @@ struct TStreamLoadPutRequest { 39: optional string hidden_columns 40: optional PlanNodes.TFileCompressType compress_type 41: optional i64 file_size // only for stream load with parquet or orc + 42: optional bool trim_double_quotes // trim double quotes for csv } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b14610587a..a7a21a0499 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -211,6 +211,8 @@ struct TBrokerScanRangeParams { 12: optional i32 line_delimiter_length = 1; 13: optional string column_separator_str; 14: optional string line_delimiter_str; + // trim double quotes for csv + 15: optional bool trim_double_quotes; } @@ -255,6 +257,8 @@ struct TFileAttributes { 8: optional bool read_by_column_def; // csv with header type 9: optional string header_type; + // trim double quotes for csv + 10: optional bool trim_double_quotes; } struct TIcebergDeleteFileDesc { diff --git a/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv b/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv new file mode 100644 index 0000000000..44ae3f4550 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv @@ -0,0 +1,8 @@ +1,2,3,abc,2022-12-01,2022-12-01:09:30:31 +2,3,3,abc,2022-12-01,2022-12-01:09:30:31 +3,4,3,abc,2022-12-01,2022-12-01:09:30:31 +4,5,3,abc,2022-12-01,2022-12-01:09:30:31 +"5","6","3","abc","2022-12-01","2022-12-01:09:30:31" +"6","7","3","abc","2022-12-01","2022-12-01:09:30:31" +"7","8","3","abc","2022-12-01","2022-12-01:09:30:31" +"8","9","3","abc","2022-12-01","2022-12-01:09:30:31" diff --git a/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out b/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out new file mode 100644 index 0000000000..0ae5ebe7f7 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +\N \N \N "abc" \N \N +\N \N \N "abc" \N \N +\N \N \N "abc" \N \N +\N \N \N "abc" \N \N +1 2 3 abc 2022-12-01 2022-12-01T09:30:31 +2 3 3 abc 2022-12-01 2022-12-01T09:30:31 +3 4 3 abc 2022-12-01 2022-12-01T09:30:31 +4 5 3 abc 2022-12-01 2022-12-01T09:30:31 + +-- !sql -- +1 2 3 abc 2022-12-01 2022-12-01T09:30:31 +2 3 3 abc 2022-12-01 2022-12-01T09:30:31 +3 4 3 abc 2022-12-01 2022-12-01T09:30:31 +4 5 3 abc 2022-12-01 2022-12-01T09:30:31 +5 6 3 abc 2022-12-01 2022-12-01T09:30:31 +6 7 3 abc 2022-12-01 2022-12-01T09:30:31 +7 8 3 abc 2022-12-01 2022-12-01T09:30:31 +8 9 3 abc 2022-12-01 2022-12-01T09:30:31 + diff --git a/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy b/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy new file mode 100644 index 0000000000..429e8c88fd --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy @@ -0,0 +1,64 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_csv_with_double_quotes", "p0") { + def tableName = "test_csv_with_double_quotes" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) NULL, + `v2` string NULL, + `v3` date NULL, + `v4` datetime NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + + file 'csv_with_double_quotes.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + qt_sql "select * from ${tableName} order by k1, k2" + + sql """truncate table ${tableName}""" + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'trim_double_quotes', 'true' + + file 'csv_with_double_quotes.csv' + time 10000 // limit inflight 10s + } + + sql "sync" + qt_sql "select * from ${tableName} order by k1, k2" + sql """ DROP TABLE IF EXISTS ${tableName} """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org