This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e9f12fac47e [fix](load) fix no error url for stream load #38325 (#38417) e9f12fac47e is described below commit e9f12fac47e5703b5aadf9b0d34ef56c94c5706e Author: Xin Liao <liaoxin...@126.com> AuthorDate: Sun Jul 28 19:06:57 2024 +0800 [fix](load) fix no error url for stream load #38325 (#38417) cherry pick from #38325 --- .../runtime/stream_load/stream_load_executor.cpp | 4 +-- .../load_p0/stream_load/test_stream_load.groovy | 42 +++++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 051aca3e130..0fcd4af1ce5 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -90,9 +90,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte // some users may rely on this error message. *status = Status::DataQualityError("too many filtered rows"); } - if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); - } + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); if (status->ok()) { DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 6c002b2d29b..574e2e1466e 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -66,6 +66,15 @@ suite("test_stream_load", "p0") { file 'test_strict_mode.csv' time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) + } } sql "sync" @@ -90,6 +99,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) + assertTrue(result.contains("ErrorURL")) assertEquals(2, json.NumberTotalRows) assertEquals(1, json.NumberFilteredRows) } @@ -117,6 +127,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(result.contains("ErrorURL")) assertEquals(2, json.NumberTotalRows) assertEquals(1, json.NumberFilteredRows) } @@ -160,6 +171,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(3, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) } @@ -210,6 +222,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(1, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) } @@ -428,6 +441,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } sql "sync" @@ -453,6 +467,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } sql "sync" @@ -474,6 +489,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(4, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) } @@ -497,6 +513,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2500, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) } @@ -524,6 +541,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) + assertTrue(result.contains("ErrorURL")) assertEquals(0, json.NumberLoadedRows) } } @@ -547,6 +565,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2500, json.NumberTotalRows) assertEquals(2500, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -579,6 +598,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2500, json.NumberTotalRows) assertEquals(11, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -609,6 +629,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2500, json.NumberTotalRows) assertEquals(2500, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -639,6 +660,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(result.contains("ErrorURL")) assertEquals(2500, json.NumberTotalRows) assertEquals(0, json.NumberLoadedRows) assertEquals(2500, json.NumberFilteredRows) @@ -695,6 +717,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(1025, json.NumberTotalRows) assertEquals(1025, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -723,6 +746,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(8001, json.NumberTotalRows) assertEquals(8001, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -754,6 +778,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(6, json.NumberTotalRows) assertEquals(6, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -782,6 +807,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(5, json.NumberTotalRows) assertEquals(5, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -834,6 +860,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(10, json.NumberTotalRows) assertEquals(10, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -884,6 +911,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(5, json.NumberTotalRows) assertEquals(5, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -936,6 +964,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(14, json.NumberTotalRows) assertEquals(14, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) @@ -990,6 +1019,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(11, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(5, json.NumberUnselectedRows) @@ -1042,6 +1072,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) @@ -1084,6 +1115,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) @@ -1224,6 +1256,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) @@ -1253,6 +1286,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) assertEquals(2, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) @@ -1370,6 +1404,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } sql "sync" @@ -1391,6 +1426,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } sql "sync" @@ -1447,6 +1483,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } @@ -1464,6 +1501,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } @@ -1483,6 +1521,7 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) + assertTrue(!result.contains("ErrorURL")) } } sql "sync" @@ -1566,7 +1605,8 @@ suite("test_stream_load", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]Encountered unqualified data, stop processing")) + assertTrue(result.contains("ErrorURL")) + assertTrue(json.Message.contains("Encountered unqualified data, stop processing")) } } } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org