This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new bd24a8bdd9b [Fix](csv_reader) Add a session variable to control 
whether empty rows in CSV files are read as NULL values (#37153)
bd24a8bdd9b is described below

commit bd24a8bdd9b7e327fc2ded62bc6fc30092c46c27
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Tue Jul 2 22:12:17 2024 +0800

    [Fix](csv_reader) Add a session variable to control whether empty rows in 
CSV files are read as NULL values (#37153)
    
    bp: #36668
---
 be/src/runtime/runtime_state.h                     |   5 +
 be/src/vec/exec/format/csv/csv_reader.cpp          |  25 ++++-
 be/src/vec/exec/format/csv/csv_reader.h            |   1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   8 ++
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 .../tvf/test_read_csv_empty_line_as_null.out       |  31 ++++++
 .../tvf/test_read_csv_empty_line_as_null.groovy    | 111 +++++++++++++++++++++
 7 files changed, 180 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 33b5ded9c3a..b88b29ee8d0 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -518,6 +518,11 @@ public:
         return _query_options.__isset.enable_parallel_scan && 
_query_options.enable_parallel_scan;
     }
 
+    bool is_read_csv_empty_line_as_null() const {
+        return _query_options.__isset.read_csv_empty_line_as_null &&
+               _query_options.read_csv_empty_line_as_null;
+    }
+
     int parallel_scan_max_scanners_count() const {
         return _query_options.__isset.parallel_scan_max_scanners_count
                        ? _query_options.parallel_scan_max_scanners_count
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index a10ba8c3d14..7894b5c57ae 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -485,7 +485,10 @@ Status CsvReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 continue;
             }
             if (size == 0) {
-                // Read empty row, just continue
+                if (!_line_reader_eof && 
_state->is_read_csv_empty_line_as_null()) {
+                    ++rows;
+                }
+                // Read empty line, continue
                 continue;
             }
 
@@ -518,7 +521,10 @@ Status CsvReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 continue;
             }
             if (size == 0) {
-                // Read empty row, just continue
+                if (!_line_reader_eof && 
_state->is_read_csv_empty_line_as_null()) {
+                    RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows));
+                }
+                // Read empty line, continue
                 continue;
             }
 
@@ -661,6 +667,21 @@ Status CsvReader::_fill_dest_columns(const Slice& line, 
Block* block,
     return Status::OK();
 }
 
+Status CsvReader::_fill_empty_line(Block* block, 
std::vector<MutableColumnPtr>& columns,
+                                   size_t* rows) {
+    for (int i = 0; i < _file_slot_descs.size(); ++i) {
+        IColumn* col_ptr = columns[i];
+        if (!_is_load) {
+            col_ptr = const_cast<IColumn*>(
+                    
block->get_by_position(_file_slot_idx_map[i]).column.get());
+        }
+        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
+        null_column.insert_data(nullptr, 0);
+    }
+    ++(*rows);
+    return Status::OK();
+}
+
 Status CsvReader::_validate_line(const Slice& line, bool* success) {
     if (!_is_proto_format && !validate_utf8(line.data, line.size)) {
         if (!_is_load) {
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index d9c8633f427..65eba62a54c 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -204,6 +204,7 @@ private:
     Status _create_decompressor();
     Status _fill_dest_columns(const Slice& line, Block* block,
                               std::vector<MutableColumnPtr>& columns, size_t* 
rows);
+    Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& 
columns, size_t* rows);
     Status _line_split_to_values(const Slice& line, bool* success);
     void _split_line(const Slice& line);
     Status _check_array_format(std::vector<Slice>& split_values, bool* 
is_success);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d6e75faf673..5cf6cb901d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -194,6 +194,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = 
"enable_sync_runtime_filter_size";
 
+    public static final String READ_CSV_EMPTY_LINE_AS_NULL = 
"read_csv_empty_line_as_null";
+
     public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
 
     // max ms to wait transaction publish finish when exec insert stmt.
@@ -1034,6 +1036,11 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = 
true)
     private boolean enableSyncRuntimeFilterSize = true;
 
+    @VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = 
true,
+            description = {"在读取csv文件时是否读取csv的空行为null",
+                    "Determine whether to read empty rows in CSV files as NULL 
when reading CSV files."})
+    public boolean readCsvEmptyLineAsNull = false;
+
     @VariableMgr.VarAttr(name = USE_RF_DEFAULT)
     public boolean useRuntimeFilterDefaultSize = false;
 
@@ -3327,6 +3334,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setMinRevocableMem(minRevocableMem);
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 
+        tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 35331f13fc7..aaedd219fb2 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -297,6 +297,7 @@ struct TQueryOptions {
 
   113: optional bool enable_force_spill = false;
   
+  117: optional bool read_csv_empty_line_as_null = false
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false
 }
diff --git 
a/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out
 
b/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out
new file mode 100644
index 00000000000..6fbf970ff67
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_base --
+1,doris1,16
+2,doris2,18
+
+3,doris3,19
+
+
+
+
+4,doris4,20
+
+
+-- !select_1 --
+1      doris1  16
+2      doris2  18
+3      doris3  19
+4      doris4  20
+
+-- !select_1 --
+\N     \N      \N
+\N     \N      \N
+\N     \N      \N
+\N     \N      \N
+\N     \N      \N
+\N     \N      \N
+1      doris1  16
+2      doris2  18
+3      doris3  19
+4      doris4  20
+
diff --git 
a/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy
 
b/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy
new file mode 100644
index 00000000000..0a64109eb22
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_read_csv_empty_line_as_null", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+
+    def export_table_name = "test_read_csv_empty_line"
+    def outFilePath = "${bucket}/test_read_csv_empty_line/exp_"
+
+
+    def create_table = {table_name ->
+        sql """ DROP TABLE IF EXISTS ${table_name} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (
+                `id` INT NULL,
+                `content` varchar(32) NULL
+            )
+            DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        """
+    }
+
+    def outfile_to_S3 = {
+        // select ... into outfile ...
+        def res = sql """
+            SELECT content FROM ${export_table_name} t ORDER BY id
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS csv
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+
+        return res[0][3]
+    }
+
+    // create table to export data
+    create_table(export_table_name)
+
+    // insert data
+    sql """ insert into ${export_table_name} values (1, "1,doris1,16"); """
+    sql """ insert into ${export_table_name} values (2, "2,doris2,18"); """
+    sql """ insert into ${export_table_name} values (3, ""); """
+    sql """ insert into ${export_table_name} values (4, "3,doris3,19"); """
+    sql """ insert into ${export_table_name} values (5, ""); """
+    sql """ insert into ${export_table_name} values (6, ""); """
+    sql """ insert into ${export_table_name} values (7, ""); """
+    sql """ insert into ${export_table_name} values (8, ""); """
+    sql """ insert into ${export_table_name} values (9, "4,doris4,20"); """
+    sql """ insert into ${export_table_name} values (10, ""); """
+
+    // test base data
+    qt_select_base """ SELECT content FROM ${export_table_name} t ORDER BY id; 
"""
+
+    // test outfile to s3
+    def outfile_url = outfile_to_S3()
+
+    // test read_csv_empty_line_as_null = false
+    try {
+        order_qt_select_1 """ SELECT * FROM S3 (
+                            "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}0.csv",
+                            "ACCESS_KEY"= "${ak}",
+                            "SECRET_KEY" = "${sk}",
+                            "format" = "csv",
+                            "column_separator" = ",",
+                            "region" = "${region}"
+                        );
+                        """
+    } finally {
+    }
+
+    // test read_csv_empty_line_as_null = true
+    try {
+        sql """ set read_csv_empty_line_as_null=true; """
+        order_qt_select_1 """ SELECT * FROM S3 (
+                            "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}0.csv",
+                            "ACCESS_KEY"= "${ak}",
+                            "SECRET_KEY" = "${sk}",
+                            "format" = "csv",
+                            "column_separator" = ",",
+                            "region" = "${region}"
+                        );
+                        """
+    } finally {
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to