This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new bd24a8bdd9b [Fix](csv_reader) Add a session variable to control whether empty rows in CSV files are read as NULL values (#37153) bd24a8bdd9b is described below commit bd24a8bdd9b7e327fc2ded62bc6fc30092c46c27 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue Jul 2 22:12:17 2024 +0800 [Fix](csv_reader) Add a session variable to control whether empty rows in CSV files are read as NULL values (#37153) bp: #36668 --- be/src/runtime/runtime_state.h | 5 + be/src/vec/exec/format/csv/csv_reader.cpp | 25 ++++- be/src/vec/exec/format/csv/csv_reader.h | 1 + .../java/org/apache/doris/qe/SessionVariable.java | 8 ++ gensrc/thrift/PaloInternalService.thrift | 1 + .../tvf/test_read_csv_empty_line_as_null.out | 31 ++++++ .../tvf/test_read_csv_empty_line_as_null.groovy | 111 +++++++++++++++++++++ 7 files changed, 180 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 33b5ded9c3a..b88b29ee8d0 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -518,6 +518,11 @@ public: return _query_options.__isset.enable_parallel_scan && _query_options.enable_parallel_scan; } + bool is_read_csv_empty_line_as_null() const { + return _query_options.__isset.read_csv_empty_line_as_null && + _query_options.read_csv_empty_line_as_null; + } + int parallel_scan_max_scanners_count() const { return _query_options.__isset.parallel_scan_max_scanners_count ? _query_options.parallel_scan_max_scanners_count diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index a10ba8c3d14..7894b5c57ae 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -485,7 +485,10 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } if (size == 0) { - // Read empty row, just continue + if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) { + ++rows; + } + // Read empty line, continue continue; } @@ -518,7 +521,10 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { continue; } if (size == 0) { - // Read empty row, just continue + if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) { + RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows)); + } + // Read empty line, continue continue; } @@ -661,6 +667,21 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, return Status::OK(); } +Status CsvReader::_fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, + size_t* rows) { + for (int i = 0; i < _file_slot_descs.size(); ++i) { + IColumn* col_ptr = columns[i]; + if (!_is_load) { + col_ptr = const_cast<IColumn*>( + block->get_by_position(_file_slot_idx_map[i]).column.get()); + } + auto& null_column = assert_cast<ColumnNullable&>(*col_ptr); + null_column.insert_data(nullptr, 0); + } + ++(*rows); + return Status::OK(); +} + Status CsvReader::_validate_line(const Slice& line, bool* success) { if (!_is_proto_format && !validate_utf8(line.data, line.size)) { if (!_is_load) { diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index d9c8633f427..65eba62a54c 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -204,6 +204,7 @@ private: Status _create_decompressor(); Status _fill_dest_columns(const Slice& line, Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows); + Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows); Status _line_split_to_values(const Slice& line, bool* success); void _split_line(const Slice& line); Status _check_array_format(std::vector<Slice>& split_values, bool* is_success); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d6e75faf673..5cf6cb901d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -194,6 +194,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size"; + public static final String READ_CSV_EMPTY_LINE_AS_NULL = "read_csv_empty_line_as_null"; + public static final String BE_NUMBER_FOR_TEST = "be_number_for_test"; // max ms to wait transaction publish finish when exec insert stmt. @@ -1034,6 +1036,11 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = true) private boolean enableSyncRuntimeFilterSize = true; + @VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = true, + description = {"在读取csv文件时是否读取csv的空行为null", + "Determine whether to read empty rows in CSV files as NULL when reading CSV files."}) + public boolean readCsvEmptyLineAsNull = false; + @VariableMgr.VarAttr(name = USE_RF_DEFAULT) public boolean useRuntimeFilterDefaultSize = false; @@ -3327,6 +3334,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setMinRevocableMem(minRevocableMem); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 35331f13fc7..aaedd219fb2 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -297,6 +297,7 @@ struct TQueryOptions { 113: optional bool enable_force_spill = false; + 117: optional bool read_csv_empty_line_as_null = false // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false } diff --git a/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out b/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out new file mode 100644 index 00000000000..6fbf970ff67 --- /dev/null +++ b/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base -- +1,doris1,16 +2,doris2,18 + +3,doris3,19 + + + + +4,doris4,20 + + +-- !select_1 -- +1 doris1 16 +2 doris2 18 +3 doris3 19 +4 doris4 20 + +-- !select_1 -- +\N \N \N +\N \N \N +\N \N \N +\N \N \N +\N \N \N +\N \N \N +1 doris1 16 +2 doris2 18 +3 doris3 19 +4 doris4 20 + diff --git a/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy b/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy new file mode 100644 index 00000000000..0a64109eb22 --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_read_csv_empty_line_as_null", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + + def export_table_name = "test_read_csv_empty_line" + def outFilePath = "${bucket}/test_read_csv_empty_line/exp_" + + + def create_table = {table_name -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + `id` INT NULL, + `content` varchar(32) NULL + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + } + + def outfile_to_S3 = { + // select ... into outfile ... + def res = sql """ + SELECT content FROM ${export_table_name} t ORDER BY id + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS csv + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + // create table to export data + create_table(export_table_name) + + // insert data + sql """ insert into ${export_table_name} values (1, "1,doris1,16"); """ + sql """ insert into ${export_table_name} values (2, "2,doris2,18"); """ + sql """ insert into ${export_table_name} values (3, ""); """ + sql """ insert into ${export_table_name} values (4, "3,doris3,19"); """ + sql """ insert into ${export_table_name} values (5, ""); """ + sql """ insert into ${export_table_name} values (6, ""); """ + sql """ insert into ${export_table_name} values (7, ""); """ + sql """ insert into ${export_table_name} values (8, ""); """ + sql """ insert into ${export_table_name} values (9, "4,doris4,20"); """ + sql """ insert into ${export_table_name} values (10, ""); """ + + // test base data + qt_select_base """ SELECT content FROM ${export_table_name} t ORDER BY id; """ + + // test outfile to s3 + def outfile_url = outfile_to_S3() + + // test read_csv_empty_line_as_null = false + try { + order_qt_select_1 """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.csv", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "csv", + "column_separator" = ",", + "region" = "${region}" + ); + """ + } finally { + } + + // test read_csv_empty_line_as_null = true + try { + sql """ set read_csv_empty_line_as_null=true; """ + order_qt_select_1 """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.csv", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "csv", + "column_separator" = ",", + "region" = "${region}" + ); + """ + } finally { + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org