freemandealer commented on code in PR #27752:
URL: https://github.com/apache/doris/pull/27752#discussion_r1424943855


##########
be/src/http/action/stream_load.cpp:
##########
@@ -302,12 +309,21 @@ Status StreamLoadAction::_on_header(HttpRequest* 
http_req, std::shared_ptr<Strea
                 evhttp_request_get_connection(http_req->get_evhttp_request()), 
csv_max_body_bytes);
 #endif
     }
-
     if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
         if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != 
std::string::npos) {
             ctx->is_chunked_transfer = true;
         }
     }
+    if (http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && 
!ctx->is_chunked_transfer) {
+        LOG(WARNING) << "content_length is empty and 
transfer-encoding!=chunked, please set "
+                        "content_length or transfer-encoding=chunked";
+        return Status::InternalError(
+                "content_length is empty and transfer-encoding!=chunked, 
please set content_length "
+                "or transfer-encoding=chunked");
+    } else if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && 
ctx->is_chunked_transfer) {

Review Comment:
   ditto, UNLIKELY



##########
regression-test/suites/load_p0/stream_load/test_stream_load.groovy:
##########
@@ -1575,5 +1575,136 @@ suite("test_stream_load", "p0") {
     } finally {
         sql """ DROP TABLE IF EXISTS ${tableName19} FORCE"""
     }
+
+   def sql_result = sql """ select Host, HttpPort from backends() where alive 
= true limit 1; """
+  
+   log.info(sql_result[0][0].toString())
+   log.info(sql_result[0][1].toString())
+   log.info(sql_result[0].size.toString())
+
+   def beHost=sql_result[0][0]
+   def beHttpPort=sql_result[0][1]
+   log.info("${beHost}".toString())
+   log.info("${beHttpPort}".toString());
+
+    //test be : chunked transfer + Content Length
+    try {
+       sql """ DROP TABLE IF EXISTS ${tableName16} """
+       sql """
+           CREATE TABLE IF NOT EXISTS ${tableName16} (
+               `k1` bigint(20) NULL DEFAULT "1",
+               `k2` bigint(20) NULL ,
+               `v1` tinyint(4) NULL,
+               `v2` tinyint(4) NULL,
+               `v3` tinyint(4) NULL,
+               `v4` DATETIME NULL
+           ) ENGINE=OLAP
+           DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+           PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+       """
+   
+       def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:0  -H 
Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T 
${context.dataPath}/test_chunked_transfer.csv 
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load";
+       log.info("test chunked transfer command: ${command}")
+       def process = command.execute()
+       code = process.waitFor()
+       out = process.text
+       log.info("test chunked transfer result: ${out}".toString())
+       def json = parseJson(out)
+       assertEquals("fail", json.Status.toLowerCase())
+       assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set 
both content_length and transfer-encoding"))
+    } finally {
+       sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+    }
+
+
+    //test be : no chunked transfer + no Content Length
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName16} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:  -H 
Transfer-Encoding: -T ${context.dataPath}/test_chunked_transfer.csv 
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load";
+        log.info("test chunked transfer command: ${command}")
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("test chunked transfer result: ${out}".toString())
+        def json = parseJson(out)
+        assertEquals("fail", json.Status.toLowerCase())
+        assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is 
empty and transfer-encoding!=chunked, please set content_length or 
transfer-encoding=chunked"))
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+    }
+
+
+    //test be : no chunked transfer + Content Length=0
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName16} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:0  -H 
Transfer-Encoding: -H columns:k1,k2,v1,v2,v3 -T 
${context.dataPath}/test_chunked_transfer.csv 
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load";
+        log.info("test chunked transfer command: ${command}")
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("test chunked transfer result: ${out}".toString())
+        def json = parseJson(out)
+        assertEquals("success", json.Status.toLowerCase())
+        assertEquals(0, json.NumberTotalRows)
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+    }
+
+
+    //test fe : no chunked transfer + Content Length=0
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName16} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:0  -H 
Transfer-Encoding:  -H columns:k1,k2,v1,v2,v3 -T 
${context.dataPath}/test_chunked_transfer.csv 
http://${context.config.feHttpAddress}/api/${db}/${tableName16}/_stream_load";
+        log.info("test chunked transfer command: ${command}")
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("test chunked transfer result: ${out}".toString())
+        def json = parseJson(out)
+        assertEquals("success", json.Status.toLowerCase())

Review Comment:
   why success?



##########
be/src/http/action/stream_load.cpp:
##########
@@ -302,12 +309,21 @@ Status StreamLoadAction::_on_header(HttpRequest* 
http_req, std::shared_ptr<Strea
                 evhttp_request_get_connection(http_req->get_evhttp_request()), 
csv_max_body_bytes);
 #endif
     }
-
     if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
         if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != 
std::string::npos) {
             ctx->is_chunked_transfer = true;
         }
     }
+    if (http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && 
!ctx->is_chunked_transfer) {

Review Comment:
   please add UNLIKELY to make branch predictor happy



##########
regression-test/suites/load_p0/stream_load/test_stream_load.groovy:
##########
@@ -1575,5 +1575,136 @@ suite("test_stream_load", "p0") {
     } finally {
         sql """ DROP TABLE IF EXISTS ${tableName19} FORCE"""
     }
+
+   def sql_result = sql """ select Host, HttpPort from backends() where alive 
= true limit 1; """
+  
+   log.info(sql_result[0][0].toString())
+   log.info(sql_result[0][1].toString())
+   log.info(sql_result[0].size.toString())
+
+   def beHost=sql_result[0][0]
+   def beHttpPort=sql_result[0][1]
+   log.info("${beHost}".toString())
+   log.info("${beHttpPort}".toString());
+
+    //test be : chunked transfer + Content Length
+    try {
+       sql """ DROP TABLE IF EXISTS ${tableName16} """
+       sql """
+           CREATE TABLE IF NOT EXISTS ${tableName16} (
+               `k1` bigint(20) NULL DEFAULT "1",
+               `k2` bigint(20) NULL ,
+               `v1` tinyint(4) NULL,
+               `v2` tinyint(4) NULL,
+               `v3` tinyint(4) NULL,
+               `v4` DATETIME NULL
+           ) ENGINE=OLAP
+           DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+           PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+       """
+   
+       def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:0  -H 
Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T 
${context.dataPath}/test_chunked_transfer.csv 
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load";
+       log.info("test chunked transfer command: ${command}")
+       def process = command.execute()
+       code = process.waitFor()
+       out = process.text
+       log.info("test chunked transfer result: ${out}".toString())
+       def json = parseJson(out)
+       assertEquals("fail", json.Status.toLowerCase())
+       assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set 
both content_length and transfer-encoding"))
+    } finally {
+       sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+    }
+
+
+    //test be : no chunked transfer + no Content Length
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName16} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:  -H 
Transfer-Encoding: -T ${context.dataPath}/test_chunked_transfer.csv 
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load";
+        log.info("test chunked transfer command: ${command}")
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("test chunked transfer result: ${out}".toString())
+        def json = parseJson(out)
+        assertEquals("fail", json.Status.toLowerCase())
+        assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is 
empty and transfer-encoding!=chunked, please set content_length or 
transfer-encoding=chunked"))
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
+    }
+
+
+    //test be : no chunked transfer + Content Length=0
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName16} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName16} (
+                `k1` bigint(20) NULL DEFAULT "1",
+                `k2` bigint(20) NULL ,
+                `v1` tinyint(4) NULL,
+                `v2` tinyint(4) NULL,
+                `v3` tinyint(4) NULL,
+                `v4` DATETIME NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def command = "curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
column_separator:| -H ${db}:${tableName16} -H Content-Length:0  -H 
Transfer-Encoding: -H columns:k1,k2,v1,v2,v3 -T 
${context.dataPath}/test_chunked_transfer.csv 
http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load";
+        log.info("test chunked transfer command: ${command}")
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        log.info("test chunked transfer result: ${out}".toString())
+        def json = parseJson(out)
+        assertEquals("success", json.Status.toLowerCase())

Review Comment:
   why success?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to