This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b3e9b3e8667 branch-4.0: [fix](stream-load) fix http_stream compressed 
file truncation and stabilize sql_cache P0 cases (#64769)
b3e9b3e8667 is described below

commit b3e9b3e8667d06d6a3bb8de01e37b9014328b26d
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Jun 24 23:28:44 2026 +0800

    branch-4.0: [fix](stream-load) fix http_stream compressed file truncation 
and stabilize sql_cache P0 cases (#64769)
    
    ## Proposed changes
    
    Fixes several `branch-4.0` regression-test failures.
    
    The first two fix intermittent P0 failures on the ASAN multi-FE pipeline
    (e.g. CI build 199345):
    `load_p0/http_stream/test_http_stream_properties` and the
    `nereids_p0/cache` sql-cache cases. The last three fix `branch-4.0` P1
    failures that also reproduce on the branch-3.1 P1 pipeline (e.g. CI
    build 199346): `nereids_syntax_p1/mv/aggregate/agg_sync_mv`,
    `tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size`,
    and the flaky `tpcds_sf1_p1/spill_test`.
    
    ### 1. `[fix](stream-load)` http_stream schema-inference truncation for
    compressed files
    
    A compressed (gz/bz2) load through the `http_stream` TVF intermittently
    failed with:
    
    ```
    [INTERNAL_ERROR]Compressed file has been truncated, which is not allowed
    ```
    
    with the BE stack `fetch_table_schema` → `CsvReader::get_parsed_schema`
    → `CsvReader::_parse_col_nums` → `NewPlainTextLineReader::read_line`.
    
    **Root cause:** the table schema is sniffed from the first part of the
    request body buffered in `schema_buffer`.
    `HttpStreamAction::on_chunk_data` is a per-chunk libevent callback, and
    its end-of-callback block fired `process_put` (which triggers FE schema
    inference over `schema_buffer`) whenever `is_read_schema` was still set
    — i.e. at the end of *every* callback — treating "the current evbuffer
    is drained" as "the whole body has been read". When the body spans
    multiple chunk callbacks (common under load), schema inference runs on a
    partial buffer. For uncompressed data a partial buffer is harmless, but
    a partial compressed buffer is an incomplete compressed stream, so
    decompression hits EOF before the stream end and reports truncation.
    
    **Fix:** only trigger schema inference once the whole body has been
    received — gate the `on_chunk_data` trigger on `receive_bytes >=
    body_bytes` (Content-Length known), and trigger it at request completion
    in `_handle` for the chunked / unknown-length case. The `>= 1MB` path is
    unchanged.
    
    ### 2. `[test](sql-cache)` skip sql cache cases when the connected FE is
    not master
    
    `mv_with_sql_cache`, `mtmv_with_sql_cache`, `parse_sql_from_sql_cache`
    assert that the sql cache is invalidated right after `RENAME ROLLUP` /
    `MODIFY COLUMN` / `ADD PARTITION`. That invalidation only happens
    locally on the FE that executes the DDL; on a follower FE the sql cache
    is not invalidated on metadata replay (the fix #63612 was reverted by
    #63872), so the `assertNoCache` checks are flaky when the suite happens
    to connect to a follower FE. The cases are now skipped unless the
    connected FE is the master FE.
    
    ### 3. `[fix](test)` avoid a duplicate internal mv column name in
    `agg_sync_mv`
    
    `nereids_syntax_p1/mv/aggregate/agg_sync_mv` failed deterministically
    when creating a second sync mv over the same column:
    
    ```
    create materialized view mv_sync41 as select id as g3, stddev_pop(kint) 
from agg_mv_test group by id order by id;
    => errCode = 2, detailMessage = Duplicate column name '__stddev_1'
    ```
    
    `stddev` and `stddev_pop` map to the same function class whose name is
    `"stddev"` (likewise `variance` / `var_pop` → `"variance"`). Without an
    explicit alias the inferred internal value-column name
    (`InferPlanOutputAlias`: `"__" + name + "_" + index`) is `__stddev_1`
    for both `mv_sync40` (`stddev`) and `mv_sync41` (`stddev_pop`), so the
    second `create mv` collides with the first mv's column and is rejected
    with `ERR_DUP_FIELDNAME`.
    
    **Fix (test only):** give the two colliding aggregate columns an
    explicit alias (`mv_sync41` `stddev_pop(kint)` → `as g3v`, `mv_sync48`
    `var_pop(kint)` → `as s4v`). An explicit alias is not name-from-child,
    so the inferred-name path is skipped and the internal column names no
    longer collide. The queried results (`.out`) are unchanged.
    
    ### 4. `[fix](test)` stale `_p0` database reference in the tpch
    anti-join case
    
    `tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size`
    failed at analysis time with `Database
    [regression_test_tpch_unique_sql_zstd_bucket1_p0] does not exist`. PR
    #56986 moved the suite from `_p0` to `_p1` as a pure directory rename;
    the framework derives the working database from the directory name (now
    `..._p1`), but this one `.sql` still hard-coded the old fully-qualified
    `regression_test_tpch_unique_sql_zstd_bucket1_p0.lineitem` while no
    `_p0` loader exists anymore. Use the unqualified table name `lineitem`
    (resolved against the current suite db) as every sibling `q*.sql` does,
    and fix the hint typo `SEV_VAR` → `SET_VAR` (silently-ignored hints) so
    `batch_size=3` is actually applied; results are unchanged.
    
    ### 5. `[test](pipeline)` exclude p1 spill cases from the non-cloud p1
    pipeline
    
    `tpcds_sf1_p1/spill_test` (e.g. `q23`) intermittently fails under the
    ASAN p1 pipeline with a process-level `MEM_LIMIT_EXCEEDED` ("could not
    find task to spill"): the query itself uses little memory, but with up
    to `suiteParallel` memory-heavy spill suites running concurrently the
    whole process trips the ASAN soft memory limit. `cloud_p1` already
    excludes these three spill dirs for the same reason (PR #39853, "in
    fixing, MEM_LIMIT_EXCEEDED"); mirror that exclusion in the non-cloud p1
    pipeline.
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    https://claude.ai/code/session_01LJWGGEQq3sx3m1tssKVBeR
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 be/src/http/action/http_stream.cpp                 | 22 ++++++++++++++++++----
 .../pipeline/p1/conf/regression-conf.groovy        |  3 +++
 .../nereids_p0/cache/mtmv_with_sql_cache.groovy    | 11 +++++++++++
 .../nereids_p0/cache/mv_with_sql_cache.groovy      | 11 +++++++++++
 .../cache/parse_sql_from_sql_cache.groovy          | 11 +++++++++++
 .../mv/aggregate/agg_sync_mv.groovy                |  4 ++--
 .../sql/test_left_anti_join_batch_size.sql         |  6 +++---
 7 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index c47a22164ec..2cfa6d9c697 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -130,6 +130,14 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, 
std::shared_ptr<StreamLo
                      << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
         return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't 
equal with body bytes");
     }
+    // For a chunked / unknown-Content-Length body smaller than the schema 
buffer, the schema
+    // has not been sniffed yet (on_chunk_data could not tell when the body 
was complete, so it
+    // deferred to here). Now that the whole request body has been received, 
trigger it on the
+    // full buffer before finishing the pipe.
+    if (ctx->is_read_schema) {
+        ctx->is_read_schema = false;
+        RETURN_IF_ERROR(process_put(http_req, ctx));
+    }
     RETURN_IF_ERROR(ctx->body_sink->finish());
 
     // wait stream load finish
@@ -286,10 +294,16 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
         }
         ctx->receive_bytes += remove_bytes;
     }
-    // after all the data has been read and it has not reached 1M, it will 
execute here
-    if (ctx->is_read_schema) {
-        LOG(INFO) << "after all the data has been read and it has not reached 
1M, it will execute "
-                  << "here";
+    // The schema is sniffed from `schema_buffer`. `on_chunk_data` is a 
per-chunk callback,
+    // so a body that spans multiple callbacks would otherwise be 
schema-sniffed on a partial
+    // buffer. That is harmless for uncompressed data, but for a compressed 
body a partial
+    // buffer is an incomplete compressed stream and schema sniffing fails with
+    // "Compressed file has been truncated". So only trigger here once the 
whole body has been
+    // received (Content-Length known). The chunked / unknown-length case is 
handled at
+    // request completion in `_handle`.
+    if (ctx->is_read_schema && ctx->body_bytes > 0 && ctx->receive_bytes >= 
ctx->body_bytes) {
+        LOG(INFO) << "all the data has been read and it has not reached 1M, 
request fe to obtain "
+                  << "column information. id=" << ctx->id;
         ctx->is_read_schema = false;
         ctx->status = process_put(req, ctx);
     }
diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy 
b/regression-test/pipeline/p1/conf/regression-conf.groovy
index 0d542ed5653..883b29f58e7 100644
--- a/regression-test/pipeline/p1/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p1/conf/regression-conf.groovy
@@ -64,6 +64,9 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // 
keep this line as th
 excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this 
line as the first line
     "fault_injection_p0," +
     "workload_manager_p1," +
+    "tpcds_sf1_unique_p1/spill," + // in fixing, MEM_LIMIT_EXCEEDED
+    "tpcds_sf1_p1/spill_test," + // in fixing, MEM_LIMIT_EXCEEDED
+    "tpch_sf0.1_p1/spill," + // in fixing, MEM_LIMIT_EXCEEDED
     "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line
 
 cacheDataPath="/data/regression/"
diff --git a/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy 
b/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
index f6721453467..c80bd22f1da 100644
--- a/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
@@ -48,6 +48,17 @@ class CanRetryException extends IllegalStateException {
 }
 
 suite("mtmv_with_sql_cache") {
+    // SQL cache invalidation on metadata replay is only reliable on the 
master FE.
+    // On a follower/observer FE, a replayed RENAME ROLLUP / MODIFY COLUMN / 
ADD PARTITION
+    // does NOT invalidate the local SQL cache (the fix #63612 was reverted by 
#63872),
+    // so the assertNoCache checks below would be flaky. Skip unless connected 
to the master.
+    def currentFe = sql_return_maparray("SHOW FRONTENDS").find { 
it.CurrentConnected == "Yes" }
+    if (currentFe == null || !Boolean.parseBoolean(currentFe.IsMaster)) {
+        logger.info("Skip mtmv_with_sql_cache: connected FE is not master, "
+                + "SQL cache is not invalidated on follower metadata replay 
(#63612 reverted by #63872).")
+        return
+    }
+
     withGlobalLock("cache_last_version_interval_second") {
 
         sql """ADMIN SET ALL FRONTENDS CONFIG 
('cache_last_version_interval_second' = '0');"""
diff --git a/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy 
b/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
index 6b9fe0d656d..3e4554709e7 100644
--- a/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
@@ -38,6 +38,17 @@ class CanRetryException extends IllegalStateException {
 }
 
 suite("mv_with_sql_cache") {
+    // SQL cache invalidation on metadata replay is only reliable on the 
master FE.
+    // On a follower/observer FE, a replayed RENAME ROLLUP / MODIFY COLUMN / 
ADD PARTITION
+    // does NOT invalidate the local SQL cache (the fix #63612 was reverted by 
#63872),
+    // so the assertNoCache checks below would be flaky. Skip unless connected 
to the master.
+    def currentFe = sql_return_maparray("SHOW FRONTENDS").find { 
it.CurrentConnected == "Yes" }
+    if (currentFe == null || !Boolean.parseBoolean(currentFe.IsMaster)) {
+        logger.info("Skip mv_with_sql_cache: connected FE is not master, "
+                + "SQL cache is not invalidated on follower metadata replay 
(#63612 reverted by #63872).")
+        return
+    }
+
     withGlobalLock("cache_last_version_interval_second") {
         sql """ADMIN SET ALL FRONTENDS CONFIG 
('cache_last_version_interval_second' = '0');"""
         sql """ADMIN SET ALL FRONTENDS CONFIG ('sql_cache_manage_num' = 
'100000')"""
diff --git 
a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy 
b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
index 477d20dd197..6de2760196b 100644
--- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
@@ -40,6 +40,17 @@ class CanRetryException extends IllegalStateException {
 }
 
 suite("parse_sql_from_sql_cache") {
+    // SQL cache invalidation on metadata replay is only reliable on the 
master FE.
+    // On a follower/observer FE, a replayed RENAME ROLLUP / MODIFY COLUMN / 
ADD PARTITION
+    // does NOT invalidate the local SQL cache (the fix #63612 was reverted by 
#63872),
+    // so the assertNoCache checks below would be flaky. Skip unless connected 
to the master.
+    def currentFe = sql_return_maparray("SHOW FRONTENDS").find { 
it.CurrentConnected == "Yes" }
+    if (currentFe == null || !Boolean.parseBoolean(currentFe.IsMaster)) {
+        logger.info("Skip parse_sql_from_sql_cache: connected FE is not 
master, "
+                + "SQL cache is not invalidated on follower metadata replay 
(#63612 reverted by #63872).")
+        return
+    }
+
     withGlobalLock("cache_last_version_interval_second") {
         def assertHasCache = { String sqlStr ->
             try {
diff --git 
a/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy 
b/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
index 4b71d29ca53..d3cc9f58ca6 100644
--- a/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
+++ b/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
@@ -387,7 +387,7 @@ suite("agg_sync_mv") {
 
     qt_select_stddev_pop """select id, stddev_pop(kint) from agg_mv_test group 
by id order by id;"""
     sql """drop materialized view if exists mv_sync41 on agg_mv_test;"""
-    createMV("""create materialized view mv_sync41 as select id as g3, 
stddev_pop(kint) from agg_mv_test group by id order by id;""")
+    createMV("""create materialized view mv_sync41 as select id as g3, 
stddev_pop(kint) as g3v from agg_mv_test group by id order by id;""")
     mv_rewrite_any_success("select id, stddev_pop(kint) from agg_mv_test group 
by id order by id;", ["mv_sync40", "mv_sync41"])
     qt_select_stddev_pop_mv """select id, stddev_pop(kint) from agg_mv_test 
group by id order by id;"""
 
@@ -429,7 +429,7 @@ suite("agg_sync_mv") {
 
     qt_select_var_pop """select id, var_pop(kint) from agg_mv_test group by id 
order by id;"""
     sql """drop materialized view if exists mv_sync48 on agg_mv_test;"""
-    createMV("""create materialized view mv_sync48 as select id as s4, 
var_pop(kint) from agg_mv_test group by id order by id;""")
+    createMV("""create materialized view mv_sync48 as select id as s4, 
var_pop(kint) as s4v from agg_mv_test group by id order by id;""")
     mv_rewrite_any_success("select id, var_pop(kint) from agg_mv_test group by 
id order by id;", ["mv_sync47", "mv_sync48"])
     qt_select_var_pop_mv """select id, var_pop(kint) from agg_mv_test group by 
id order by id;"""
 
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
 
b/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
index 6a8dc6edf5c..af67ce10af0 100644
--- 
a/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
+++ 
b/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
@@ -1,16 +1,16 @@
 -- tables: supplier,lineitem,orders,nation
-SELECT /*+SEV_VAR(batch_size=3)*/
+SELECT /*+SET_VAR(batch_size=3)*/
   l1.l_orderkey okey,
   l1.l_suppkey  skey
 FROM
-  regression_test_tpch_unique_sql_zstd_bucket1_p0.lineitem l1
+  lineitem l1
 WHERE
   l1.l_receiptdate > l1.l_commitdate
   AND l1.L_ORDERKEY < 10000
   AND NOT exists(
     SELECT *
     FROM
-      regression_test_tpch_unique_sql_zstd_bucket1_p0.lineitem l3
+      lineitem l3
     WHERE
       l3.l_orderkey = l1.l_orderkey
       AND l3.l_suppkey <> l1.l_suppkey


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to