This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new efd3746f907 [feat](csv reader) support empty field as null (#55041)
efd3746f907 is described below

commit efd3746f907fef252007529822b1c92360ac9e5f
Author: hui lai <[email protected]>
AuthorDate: Sat Sep 6 21:14:36 2025 +0800

    [feat](csv reader) support empty field as null (#55041)
    
    Support user set empty field as null by config `empty_field_as_null`
    header. For example:
    
    create table:
    ```
    CREATE TABLE IF NOT EXISTS test_stream_load_empty_field_as_null (
                `k1` int(20) NULL,
                `k2` string NULL,
                `v1` date  NULL,
                `v2` string  NULL,
                `v3` datetime  NULL,
                `v4` string  NULL
            ) ENGINE=OLAP
            DUPLICATE KEY(`k1`)
            COMMENT 'OLAP'
            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
    ```
    
    data
    ```
    10,,2023-07-15,def,2023-07-20T05:48:31,ghi
    ```
    
    command
    ```
    curl --location-trusted -u root:"" -H "empty_field_as_null:true" -T 
data.csv -H"column_separator:," http://127.0.0.1:8030/api/db/test/_stream_load
    ```
    
    result
    ```
    10      \N      2023-07-15      def     2023-07-20T05:48:31     ghi
    ```
---
 be/src/http/action/stream_load.cpp                 |   6 +
 be/src/http/http_common.h                          |   1 +
 be/src/vec/exec/format/csv/csv_reader.cpp          |  10 ++
 be/src/vec/exec/format/csv/csv_reader.h            |   1 +
 .../fileformat/CsvFileFormatProperties.java        |  11 ++
 .../doris/load/routineload/RoutineLoadJob.java     |  13 ++
 .../doris/nereids/load/NereidsBrokerLoadTask.java  |   5 +
 .../doris/nereids/load/NereidsDataDescription.java |   2 +
 .../doris/nereids/load/NereidsLoadTaskInfo.java    |   2 +
 .../nereids/load/NereidsRoutineLoadTaskInfo.java   |  11 ++
 .../doris/nereids/load/NereidsStreamLoadTask.java  |  14 ++
 .../plans/commands/AlterRoutineLoadCommand.java    |   8 +
 .../plans/commands/info/CreateRoutineLoadInfo.java |   1 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 .../test_bulk_load_empty_field_as_null.out         | Bin 0 -> 196 bytes
 .../test_routine_load_empty_field_as_null.out      | Bin 0 -> 305 bytes
 .../load_p0/stream_load/empty_field_as_null.csv    |   2 +
 .../test_stream_load_empty_field_as_null.out       | Bin 0 -> 305 bytes
 .../load_p0/tvf/test_tvf_empty_field_as_null.out   | Bin 0 -> 199 bytes
 .../test_bulk_load_empty_field_as_null.groovy      |  82 ++++++++++
 .../test_routine_load_empty_field_as_null.groovy   | 173 +++++++++++++++++++++
 .../test_stream_load_empty_field_as_null.groovy    |  63 ++++++++
 .../tvf/test_tvf_empty_field_as_null.groovy        |  52 +++++++
 24 files changed, 459 insertions(+)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index f8f720d6b66..d8175b17f86 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -744,6 +744,12 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
     }
 
+    if (!http_req->header(HTTP_EMPTY_FIELD_AS_NULL).empty()) {
+        if (iequal(http_req->header(HTTP_EMPTY_FIELD_AS_NULL), "true")) {
+            request.__set_empty_field_as_null(true);
+        }
+    }
+
 #ifndef BE_TEST
     // plan this load
     TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 4c856ba4478..7719070cb24 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -71,5 +71,6 @@ static const std::string HTTP_WAL_ID_KY = "wal_id";
 static const std::string HTTP_AUTH_CODE = "auth_code"; // deprecated
 static const std::string HTTP_GROUP_COMMIT = "group_commit";
 static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";
+static const std::string HTTP_EMPTY_FIELD_AS_NULL = "empty_field_as_null";
 
 } // namespace doris
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 0f5ab9648ad..0e164e1c399 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -458,6 +458,12 @@ Status 
CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
 
 Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
     auto& null_column = assert_cast<ColumnNullable&>(column);
+    if (_empty_field_as_null) {
+        if (slice.size == 0) {
+            null_column.insert_data(nullptr, 0);
+            return Status::OK();
+        }
+    }
     if (_options.null_len > 0 && !(_options.converted_from_string && 
slice.trim_double_quotes())) {
         if (slice.compare(Slice(_options.null_format, _options.null_len)) == 
0) {
             null_column.insert_data(nullptr, 0);
@@ -520,6 +526,10 @@ Status CsvReader::_init_options() {
     if (_state != nullptr) {
         _keep_cr = _state->query_options().keep_carriage_return;
     }
+
+    if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
+        _empty_field_as_null = 
_params.file_attributes.text_params.empty_field_as_null;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index e28c56cbdc5..fcdc5e5606f 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -275,6 +275,7 @@ private:
     bool _trim_double_quotes = false;
     bool _trim_tailing_spaces = false;
     bool _keep_cr = false;
+    bool _empty_field_as_null = false;
 
     io::IOContext* _io_ctx = nullptr;
     // save source text which have been splitted.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
index 55e545b4fa8..57ac88e6b2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -67,6 +67,8 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
 
     public static final String PROP_ENABLE_TEXT_VALIDATE_UTF8 = 
"enable_text_validate_utf8";
 
+    public static final String PROP_EMPTY_FIELD_AS_NULL = 
"empty_field_as_null";
+
     private String headerType = "";
     private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
     private String lineDelimiter = DEFAULT_LINE_DELIMITER;
@@ -75,6 +77,7 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
     private byte enclose;
     private byte escape;
     private boolean enableTextValidateUTF8 = true;
+    private boolean emptyFieldAsNull;
 
     public CsvFileFormatProperties(String formatName) {
         super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
@@ -149,6 +152,9 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
                 enableTextValidateUTF8 = Boolean.parseBoolean(validateUtf8);
             }
 
+            emptyFieldAsNull = Boolean.valueOf(getOrDefault(formatProperties,
+                    PROP_EMPTY_FIELD_AS_NULL, "false", isRemoveOriginProperty))
+                    .booleanValue();
         } catch (org.apache.doris.common.AnalysisException e) {
             throw new AnalysisException(e.getMessage());
         }
@@ -185,6 +191,7 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
         fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
         fileTextScanRangeParams.setEnclose(this.enclose);
         fileTextScanRangeParams.setEscape(this.escape);
+        fileTextScanRangeParams.setEmptyFieldAsNull(this.emptyFieldAsNull);
         fileAttributes.setTextParams(fileTextScanRangeParams);
         fileAttributes.setHeaderType(headerType);
         fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
@@ -220,4 +227,8 @@ public class CsvFileFormatProperties extends 
FileFormatProperties {
     public byte getEscape() {
         return escape;
     }
+
+    public boolean getEmptyFieldAsNull() {
+        return emptyFieldAsNull;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 34cd9e5228f..3717b377ff2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -278,6 +278,8 @@ public abstract class RoutineLoadJob
 
     protected byte escape = 0;
 
+    protected boolean emptyFieldAsNull = false;
+
     // use for cloud cluster mode
     @SerializedName("ccn")
     protected String cloudCluster;
@@ -397,8 +399,11 @@ public abstract class RoutineLoadJob
                     new String(new 
byte[]{csvFileFormatProperties.getEnclose()}));
             jobProperties.put(CsvFileFormatProperties.PROP_ESCAPE,
                     new String(new 
byte[]{csvFileFormatProperties.getEscape()}));
+            jobProperties.put(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
+                    
String.valueOf(csvFileFormatProperties.getEmptyFieldAsNull()));
             this.enclose = csvFileFormatProperties.getEnclose();
             this.escape = csvFileFormatProperties.getEscape();
+            this.emptyFieldAsNull = 
csvFileFormatProperties.getEmptyFieldAsNull();
         } else if (fileFormatProperties instanceof JsonFileFormatProperties) {
             JsonFileFormatProperties jsonFileFormatProperties = 
(JsonFileFormatProperties) fileFormatProperties;
             jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
@@ -605,6 +610,14 @@ public abstract class RoutineLoadJob
         return escape;
     }
 
+    public boolean getEmptyFieldAsNull() {
+        String value = 
jobProperties.get(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL);
+        if (value == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(value);
+    }
+
     public boolean isStrictMode() {
         String value = jobProperties.get(LoadStmt.STRICT_MODE);
         if (value == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
index 6cf1f7d436f..7d28c0a9bdd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
@@ -225,4 +225,9 @@ public class NereidsBrokerLoadTask implements 
NereidsLoadTaskInfo {
     public byte getEscape() {
         return 0;
     }
+
+    @Override
+    public boolean getEmptyFieldAsNull() {
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
index f6a43fc772d..eb1a727afdf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
@@ -426,6 +426,8 @@ public class NereidsDataDescription {
                 String.valueOf(taskInfo.getTrimDoubleQuotes()));
         putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_SKIP_LINES,
                 String.valueOf(taskInfo.getSkipLines()));
+        
putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
+                String.valueOf(taskInfo.getEmptyFieldAsNull()));
 
         
putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
                 String.valueOf(taskInfo.isStripOuterArray()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
index 127c4e65806..0d15e3d9086 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
@@ -114,6 +114,8 @@ public interface NereidsLoadTaskInfo {
 
     boolean isFixedPartialUpdate();
 
+    boolean getEmptyFieldAsNull();
+
     default TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
         return TUniqueKeyUpdateMode.UPSERT;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index e9ffb514d66..a9ecf76523c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.thrift.TFileCompressType;
@@ -61,6 +62,7 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
     protected Separator lineDelimiter;
     protected byte enclose;
     protected byte escape;
+    protected boolean emptyFieldAsNull;
     protected int sendBatchParallelism;
     protected boolean loadToSingleTablet;
     protected boolean isPartialUpdate;
@@ -267,6 +269,15 @@ public class NereidsRoutineLoadTaskInfo implements 
NereidsLoadTaskInfo {
         return escape;
     }
 
+    @Override
+    public boolean getEmptyFieldAsNull() {
+        String value = 
jobProperties.get(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL);
+        if (value == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(value);
+    }
+
     @Override
     public int getSendBatchParallelism() {
         return sendBatchParallelism;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
index 9e1ed849b5d..6b3fa1ec3ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
@@ -95,6 +95,8 @@ public class NereidsStreamLoadTask implements 
NereidsLoadTaskInfo {
 
     private String groupCommit;
 
+    private boolean emptyFieldAsNull = false;
+
     /**
      * NereidsStreamLoadTask
      */
@@ -335,6 +337,15 @@ public class NereidsStreamLoadTask implements 
NereidsLoadTaskInfo {
         this.streamPerNode = streamPerNode;
     }
 
+    @Override
+    public boolean getEmptyFieldAsNull() {
+        return emptyFieldAsNull;
+    }
+
+    public void setEmptyFieldAsNull(boolean emptyFieldAsNull) {
+        this.emptyFieldAsNull = emptyFieldAsNull;
+    }
+
     /**
      * fromTStreamLoadPutRequest
      */
@@ -500,6 +511,9 @@ public class NereidsStreamLoadTask implements 
NereidsLoadTaskInfo {
         if (request.isSetStreamPerNode()) {
             this.streamPerNode = request.getStreamPerNode();
         }
+        if (request.isSetEmptyFieldAsNull()) {
+            emptyFieldAsNull = request.isEmptyFieldAsNull();
+        }
     }
 
     // used for stream load
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
index b173863a3ce..ac3a4a0d367 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
@@ -80,6 +80,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
             .add(JsonFileFormatProperties.PROP_JSON_ROOT)
             .add(CsvFileFormatProperties.PROP_ENCLOSE)
             .add(CsvFileFormatProperties.PROP_ESCAPE)
+            .add(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL)
             .build();
 
     private final LabelNameInfo labelNameInfo;
@@ -297,6 +298,13 @@ public class AlterRoutineLoadCommand extends AlterCommand {
             analyzedJobProperties.put(CsvFileFormatProperties.PROP_ESCAPE,
                     jobProperties.get(CsvFileFormatProperties.PROP_ESCAPE));
         }
+
+        if 
(jobProperties.containsKey(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL)) {
+            boolean emptyFieldAsNull = Boolean.parseBoolean(
+                    
jobProperties.get(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL));
+            
analyzedJobProperties.put(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
+                    String.valueOf(emptyFieldAsNull));
+        }
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index dacc706ea35..e5fe196799f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -131,6 +131,7 @@ public class CreateRoutineLoadInfo {
             .add(JsonFileFormatProperties.PROP_JSON_ROOT)
             .add(CsvFileFormatProperties.PROP_ENCLOSE)
             .add(CsvFileFormatProperties.PROP_ESCAPE)
+            .add(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL)
             .build();
 
     private static final Logger LOG = 
LogManager.getLogger(CreateRoutineLoadInfo.class);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index ebad46253f7..87d8fc8d0cf 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -556,6 +556,7 @@ struct TStreamLoadPutRequest {
     56: optional string group_commit_mode
     57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode
     58: optional Descriptors.TPartialUpdateNewRowPolicy 
partial_update_new_key_policy
+    59: optional bool empty_field_as_null
 
     // For cloud
     1000: optional string cloud_cluster
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 998f75cf40a..967eba42479 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -247,6 +247,7 @@ struct TFileTextScanRangeParams {
     5: optional i8 enclose;
     6: optional i8 escape;
     7: optional string null_format;
+    8: optional bool empty_field_as_null
 }
 
 struct TFileScanSlotInfo {
diff --git 
a/regression-test/data/load_p0/broker_load/test_bulk_load_empty_field_as_null.out
 
b/regression-test/data/load_p0/broker_load/test_bulk_load_empty_field_as_null.out
new file mode 100644
index 00000000000..9e07b24bbc8
Binary files /dev/null and 
b/regression-test/data/load_p0/broker_load/test_bulk_load_empty_field_as_null.out
 differ
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_empty_field_as_null.out
 
b/regression-test/data/load_p0/routine_load/test_routine_load_empty_field_as_null.out
new file mode 100644
index 00000000000..ffdd86573a0
Binary files /dev/null and 
b/regression-test/data/load_p0/routine_load/test_routine_load_empty_field_as_null.out
 differ
diff --git a/regression-test/data/load_p0/stream_load/empty_field_as_null.csv 
b/regression-test/data/load_p0/stream_load/empty_field_as_null.csv
new file mode 100644
index 00000000000..38a59c4335e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/empty_field_as_null.csv
@@ -0,0 +1,2 @@
+9,\N,2023-07-15,def,2023-07-20T05:48:31,ghi
+10,,2023-07-15,def,2023-07-20T05:48:31,ghi
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_empty_field_as_null.out
 
b/regression-test/data/load_p0/stream_load/test_stream_load_empty_field_as_null.out
new file mode 100644
index 00000000000..ffdd86573a0
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_stream_load_empty_field_as_null.out
 differ
diff --git a/regression-test/data/load_p0/tvf/test_tvf_empty_field_as_null.out 
b/regression-test/data/load_p0/tvf/test_tvf_empty_field_as_null.out
new file mode 100644
index 00000000000..00422d102af
Binary files /dev/null and 
b/regression-test/data/load_p0/tvf/test_tvf_empty_field_as_null.out differ
diff --git 
a/regression-test/suites/load_p0/broker_load/test_bulk_load_empty_field_as_null.groovy
 
b/regression-test/suites/load_p0/broker_load/test_bulk_load_empty_field_as_null.groovy
new file mode 100644
index 00000000000..96020c3c538
--- /dev/null
+++ 
b/regression-test/suites/load_p0/broker_load/test_bulk_load_empty_field_as_null.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_bulk_load_empty_field_as_null", "p0") {
+    def tableName = "test_bulk_load_empty_field_as_null"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` string NULL,
+            `v1` date  NULL,
+            `v2` string  NULL,
+            `v3` datetime  NULL,
+            `v4` string  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    def label = UUID.randomUUID().toString().replace("-", "0")
+
+    def sql_str = """
+            LOAD LABEL $label (
+                DATA 
INFILE("s3://${s3BucketName}/regression/load/data/empty_field_as_null.csv")
+                INTO TABLE $tableName
+                COLUMNS TERMINATED BY ","
+                FORMAT AS "CSV"
+                PROPERTIES (
+                    "empty_field_as_null" = "true"
+                )
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "${getS3AK()}",
+                "AWS_SECRET_KEY" = "${getS3SK()}",
+                "AWS_ENDPOINT" = "${getS3Endpoint()}",
+                "AWS_REGION" = "${getS3Region()}",
+                "provider" = "${getS3Provider()}"
+            );
+            """
+    logger.info("submit sql: ${sql_str}");
+    sql """${sql_str}"""
+
+    def max_try_milli_secs = 600000
+    while (max_try_milli_secs > 0) {
+        String[][] result = sql """ show load where label="$label" order by 
createtime desc limit 1; """
+        if (result[0][2].equals("FINISHED")) {
+            logger.info("Load FINISHED " + label)
+            break
+        }
+        if (result[0][2].equals("CANCELLED")) {
+            def reason = result[0][7]
+            logger.info("load failed, reason:$reason")
+            assertTrue(1 == 2)
+            break
+        }
+        Thread.sleep(1000)
+        max_try_milli_secs -= 1000
+        if(max_try_milli_secs <= 0) {
+            assertTrue(1 == 2, "load Timeout: $label")
+        }
+    }
+
+    qt_sql """ SELECT * FROM ${tableName} order by k1 """
+
+}
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_empty_field_as_null.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_empty_field_as_null.groovy
new file mode 100644
index 00000000000..71d32db5fef
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_empty_field_as_null.groovy
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_empty_field_as_null","p0") {
+    def kafkaCsvTpoics = [
+                "test_routine_load_empty_field_as_null",
+            ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+        def verifyKafkaConnection = { prod ->
+            try {
+                logger.info("=====try to connect Kafka========")
+                def partitions = 
prod.partitionsFor("__connection_verification_topic")
+                return partitions != null
+            } catch (Exception e) {
+                throw new Exception("Kafka connect fail: 
${e.message}".toString())
+            }
+        }
+        def producer = new KafkaProducer<>(props)
+        try {
+            logger.info("Kafka connecting: ${kafka_broker}")
+            if (!verifyKafkaConnection(producer)) {
+                throw new Exception("can't get any kafka info")
+            }
+        } catch (Exception e) {
+            logger.error("FATAL: " + e.getMessage())
+            producer.close()
+            throw e  
+        }
+        logger.info("Kafka connect success")
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def testData = [
+                "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+                "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+            ]
+            
+            testData.each { line ->
+                logger.info("Sending data to kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+
+        def tableName = "test_routine_load_empty_field_as_null"
+        def job = "test_follower_routine_load"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "empty_field_as_null" = "false"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def count = 0
+            def maxWaitCount = 60
+            while (count < maxWaitCount) {
+                def state = sql "show routine load for ${job}"
+                def routineLoadState = state[0][8].toString()
+                def statistic = state[0][14].toString()
+                logger.info("Routine load state: ${routineLoadState}")
+                logger.info("Routine load statistic: ${statistic}")
+                def rowCount = sql "select count(*) from ${tableName}"
+                if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_select_1 """
+                SELECT * FROM ${tableName} ORDER BY k1 
+            """
+
+            sql "pause routine load for ${job}"
+            def res = sql "show routine load for ${job}"
+            logger.info("routine load job properties: 
${res[0][11].toString()}".toString())
+            sql "ALTER ROUTINE LOAD FOR ${job} 
PROPERTIES(\"empty_field_as_null\" = \"true\");"
+            sql "truncate table ${tableName}"
+            sql "resume routine load for ${job}"
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                def testData = [
+                    "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+                    "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+                ]
+                testData.each { line ->
+                    logger.info("Sending data to kafka: ${line}")
+                    def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                    producer.send(record)
+                }
+            }
+
+            count = 0
+            while (count < maxWaitCount) {
+                def state = sql "show routine load for ${job}"
+                def routineLoadState = state[0][8].toString()
+                def statistic = state[0][14].toString()
+                logger.info("Routine load state: ${routineLoadState}")
+                logger.info("Routine load statistic: ${statistic}")
+                def rowCount = sql "select count(*) from ${tableName}"
+                if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_select_2 """
+                SELECT * FROM ${tableName} ORDER BY k1 
+            """
+
+        } catch (Exception e) {
+            logger.error("Test failed with exception: ${e.message}")
+        } finally {
+            try {
+                sql "stop routine load for ${job}"
+            } catch (Exception e) {
+                logger.warn("Failed to stop routine load job: ${e.message}")
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_empty_field_as_null.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_empty_field_as_null.groovy
new file mode 100644
index 00000000000..b4046d16de4
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_empty_field_as_null.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_empty_field_as_null", "p0") {
+
+    def tableName = "test_stream_load_empty_field_as_null"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` string NULL,
+            `v1` date  NULL,
+            `v2` string  NULL,
+            `v3` datetime  NULL,
+            `v4` string  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+
+        file "empty_field_as_null.csv"
+    }
+
+    sql "sync"
+    qt_select_1 """
+        SELECT * FROM ${tableName} ORDER BY k1 
+    """
+    sql "truncate table ${tableName}"
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'empty_field_as_null', 'true'
+
+        file "empty_field_as_null.csv"
+    }
+
+    sql "sync"
+    qt_select_2 """
+        SELECT * FROM ${tableName} ORDER BY k1 
+    """
+}
diff --git 
a/regression-test/suites/load_p0/tvf/test_tvf_empty_field_as_null.groovy 
b/regression-test/suites/load_p0/tvf/test_tvf_empty_field_as_null.groovy
new file mode 100644
index 00000000000..6abbc3969f4
--- /dev/null
+++ b/regression-test/suites/load_p0/tvf/test_tvf_empty_field_as_null.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_empty_field_as_null", "p0") {
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def tableName = "test_tvf_empty_field_as_null"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int NULL,
+            `k2` varchar(50) NULL,
+            `v1` varchar(50)  NULL,
+            `v2` varchar(50)  NULL,
+            `v3` varchar(50)  NULL,
+            `v4` varchar(50)  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    qt_select """ select * from S3 (
+                        "uri" = 
"http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv";,
+                        "ACCESS_KEY"= "${ak}",
+                        "SECRET_KEY" = "${sk}",
+                        "format" = "csv",
+                        "empty_field_as_null" = "true",
+                        "column_separator" = ",",
+                        "region" = "${region}"
+                        );
+                    """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to