This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b3e9b3e8667 branch-4.0: [fix](stream-load) fix http_stream compressed
file truncation and stabilize sql_cache P0 cases (#64769)
b3e9b3e8667 is described below
commit b3e9b3e8667d06d6a3bb8de01e37b9014328b26d
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Jun 24 23:28:44 2026 +0800
branch-4.0: [fix](stream-load) fix http_stream compressed file truncation
and stabilize sql_cache P0 cases (#64769)
## Proposed changes
Fixes several `branch-4.0` regression-test failures.
The first two fix intermittent P0 failures on the ASAN multi-FE pipeline
(e.g. CI build 199345):
`load_p0/http_stream/test_http_stream_properties` and the
`nereids_p0/cache` sql-cache cases. The last three fix `branch-4.0` P1
failures that also reproduce on the branch-3.1 P1 pipeline (e.g. CI
build 199346): `nereids_syntax_p1/mv/aggregate/agg_sync_mv`,
`tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size`,
and the flaky `tpcds_sf1_p1/spill_test`.
### 1. `[fix](stream-load)` http_stream schema-inference truncation for
compressed files
A compressed (gz/bz2) load through the `http_stream` TVF intermittently
failed with:
```
[INTERNAL_ERROR]Compressed file has been truncated, which is not allowed
```
with the BE stack `fetch_table_schema` → `CsvReader::get_parsed_schema`
→ `CsvReader::_parse_col_nums` → `NewPlainTextLineReader::read_line`.
**Root cause:** the table schema is sniffed from the first part of the
request body buffered in `schema_buffer`.
`HttpStreamAction::on_chunk_data` is a per-chunk libevent callback, and
its end-of-callback block fired `process_put` (which triggers FE schema
inference over `schema_buffer`) whenever `is_read_schema` was still set
— i.e. at the end of *every* callback — treating "the current evbuffer
is drained" as "the whole body has been read". When the body spans
multiple chunk callbacks (common under load), schema inference runs on a
partial buffer. For uncompressed data a partial buffer is harmless, but
a partial compressed buffer is an incomplete compressed stream, so
decompression hits EOF before the stream end and reports truncation.
**Fix:** only trigger schema inference once the whole body has been
received — gate the `on_chunk_data` trigger on `receive_bytes >=
body_bytes` (Content-Length known), and trigger it at request completion
in `_handle` for the chunked / unknown-length case. The `>= 1MB` path is
unchanged.
### 2. `[test](sql-cache)` skip sql cache cases when the connected FE is
not master
`mv_with_sql_cache`, `mtmv_with_sql_cache`, `parse_sql_from_sql_cache`
assert that the sql cache is invalidated right after `RENAME ROLLUP` /
`MODIFY COLUMN` / `ADD PARTITION`. That invalidation only happens
locally on the FE that executes the DDL; on a follower FE the sql cache
is not invalidated on metadata replay (the fix #63612 was reverted by
#63872), so the `assertNoCache` checks are flaky when the suite happens
to connect to a follower FE. The cases are now skipped unless the
connected FE is the master FE.
### 3. `[fix](test)` avoid a duplicate internal mv column name in
`agg_sync_mv`
`nereids_syntax_p1/mv/aggregate/agg_sync_mv` failed deterministically
when creating a second sync mv over the same column:
```
create materialized view mv_sync41 as select id as g3, stddev_pop(kint)
from agg_mv_test group by id order by id;
=> errCode = 2, detailMessage = Duplicate column name '__stddev_1'
```
`stddev` and `stddev_pop` map to the same function class whose name is
`"stddev"` (likewise `variance` / `var_pop` → `"variance"`). Without an
explicit alias the inferred internal value-column name
(`InferPlanOutputAlias`: `"__" + name + "_" + index`) is `__stddev_1`
for both `mv_sync40` (`stddev`) and `mv_sync41` (`stddev_pop`), so the
second `create mv` collides with the first mv's column and is rejected
with `ERR_DUP_FIELDNAME`.
**Fix (test only):** give the two colliding aggregate columns an
explicit alias (`mv_sync41` `stddev_pop(kint)` → `as g3v`, `mv_sync48`
`var_pop(kint)` → `as s4v`). An explicit alias is not name-from-child,
so the inferred-name path is skipped and the internal column names no
longer collide. The queried results (`.out`) are unchanged.
### 4. `[fix](test)` stale `_p0` database reference in the tpch
anti-join case
`tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size`
failed at analysis time with `Database
[regression_test_tpch_unique_sql_zstd_bucket1_p0] does not exist`. PR
#56986 moved the suite from `_p0` to `_p1` as a pure directory rename;
the framework derives the working database from the directory name (now
`..._p1`), but this one `.sql` still hard-coded the old fully-qualified
`regression_test_tpch_unique_sql_zstd_bucket1_p0.lineitem` while no
`_p0` loader exists anymore. Use the unqualified table name `lineitem`
(resolved against the current suite db) as every sibling `q*.sql` does,
and fix the hint typo `SEV_VAR` → `SET_VAR` (silently-ignored hints) so
`batch_size=3` is actually applied; results are unchanged.
### 5. `[test](pipeline)` exclude p1 spill cases from the non-cloud p1
pipeline
`tpcds_sf1_p1/spill_test` (e.g. `q23`) intermittently fails under the
ASAN p1 pipeline with a process-level `MEM_LIMIT_EXCEEDED` ("could not
find task to spill"): the query itself uses little memory, but with up
to `suiteParallel` memory-heavy spill suites running concurrently the
whole process trips the ASAN soft memory limit. `cloud_p1` already
excludes these three spill dirs for the same reason (PR #39853, "in
fixing, MEM_LIMIT_EXCEEDED"); mirror that exclusion in the non-cloud p1
pipeline.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
https://claude.ai/code/session_01LJWGGEQq3sx3m1tssKVBeR
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
be/src/http/action/http_stream.cpp | 22 ++++++++++++++++++----
.../pipeline/p1/conf/regression-conf.groovy | 3 +++
.../nereids_p0/cache/mtmv_with_sql_cache.groovy | 11 +++++++++++
.../nereids_p0/cache/mv_with_sql_cache.groovy | 11 +++++++++++
.../cache/parse_sql_from_sql_cache.groovy | 11 +++++++++++
.../mv/aggregate/agg_sync_mv.groovy | 4 ++--
.../sql/test_left_anti_join_batch_size.sql | 6 +++---
7 files changed, 59 insertions(+), 9 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index c47a22164ec..2cfa6d9c697 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -130,6 +130,14 @@ Status HttpStreamAction::_handle(HttpRequest* http_req,
std::shared_ptr<StreamLo
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" <<
ctx->id;
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't
equal with body bytes");
}
+ // For a chunked / unknown-Content-Length body smaller than the schema
buffer, the schema
+ // has not been sniffed yet (on_chunk_data could not tell when the body
was complete, so it
+ // deferred to here). Now that the whole request body has been received,
trigger it on the
+ // full buffer before finishing the pipe.
+ if (ctx->is_read_schema) {
+ ctx->is_read_schema = false;
+ RETURN_IF_ERROR(process_put(http_req, ctx));
+ }
RETURN_IF_ERROR(ctx->body_sink->finish());
// wait stream load finish
@@ -286,10 +294,16 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
}
ctx->receive_bytes += remove_bytes;
}
- // after all the data has been read and it has not reached 1M, it will
execute here
- if (ctx->is_read_schema) {
- LOG(INFO) << "after all the data has been read and it has not reached
1M, it will execute "
- << "here";
+ // The schema is sniffed from `schema_buffer`. `on_chunk_data` is a
per-chunk callback,
+ // so a body that spans multiple callbacks would otherwise be
schema-sniffed on a partial
+ // buffer. That is harmless for uncompressed data, but for a compressed
body a partial
+ // buffer is an incomplete compressed stream and schema sniffing fails with
+ // "Compressed file has been truncated". So only trigger here once the
whole body has been
+ // received (Content-Length known). The chunked / unknown-length case is
handled at
+ // request completion in `_handle`.
+ if (ctx->is_read_schema && ctx->body_bytes > 0 && ctx->receive_bytes >=
ctx->body_bytes) {
+ LOG(INFO) << "all the data has been read and it has not reached 1M,
request fe to obtain "
+ << "column information. id=" << ctx->id;
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy
b/regression-test/pipeline/p1/conf/regression-conf.groovy
index 0d542ed5653..883b29f58e7 100644
--- a/regression-test/pipeline/p1/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p1/conf/regression-conf.groovy
@@ -64,6 +64,9 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + //
keep this line as th
excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this
line as the first line
"fault_injection_p0," +
"workload_manager_p1," +
+ "tpcds_sf1_unique_p1/spill," + // in fixing, MEM_LIMIT_EXCEEDED
+ "tpcds_sf1_p1/spill_test," + // in fixing, MEM_LIMIT_EXCEEDED
+ "tpch_sf0.1_p1/spill," + // in fixing, MEM_LIMIT_EXCEEDED
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line
cacheDataPath="/data/regression/"
diff --git a/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
b/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
index f6721453467..c80bd22f1da 100644
--- a/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/mtmv_with_sql_cache.groovy
@@ -48,6 +48,17 @@ class CanRetryException extends IllegalStateException {
}
suite("mtmv_with_sql_cache") {
+ // SQL cache invalidation on metadata replay is only reliable on the
master FE.
+ // On a follower/observer FE, a replayed RENAME ROLLUP / MODIFY COLUMN /
ADD PARTITION
+ // does NOT invalidate the local SQL cache (the fix #63612 was reverted by
#63872),
+ // so the assertNoCache checks below would be flaky. Skip unless connected
to the master.
+ def currentFe = sql_return_maparray("SHOW FRONTENDS").find {
it.CurrentConnected == "Yes" }
+ if (currentFe == null || !Boolean.parseBoolean(currentFe.IsMaster)) {
+ logger.info("Skip mtmv_with_sql_cache: connected FE is not master, "
+ + "SQL cache is not invalidated on follower metadata replay
(#63612 reverted by #63872).")
+ return
+ }
+
withGlobalLock("cache_last_version_interval_second") {
sql """ADMIN SET ALL FRONTENDS CONFIG
('cache_last_version_interval_second' = '0');"""
diff --git a/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
b/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
index 6b9fe0d656d..3e4554709e7 100644
--- a/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/mv_with_sql_cache.groovy
@@ -38,6 +38,17 @@ class CanRetryException extends IllegalStateException {
}
suite("mv_with_sql_cache") {
+ // SQL cache invalidation on metadata replay is only reliable on the
master FE.
+ // On a follower/observer FE, a replayed RENAME ROLLUP / MODIFY COLUMN /
ADD PARTITION
+ // does NOT invalidate the local SQL cache (the fix #63612 was reverted by
#63872),
+ // so the assertNoCache checks below would be flaky. Skip unless connected
to the master.
+ def currentFe = sql_return_maparray("SHOW FRONTENDS").find {
it.CurrentConnected == "Yes" }
+ if (currentFe == null || !Boolean.parseBoolean(currentFe.IsMaster)) {
+ logger.info("Skip mv_with_sql_cache: connected FE is not master, "
+ + "SQL cache is not invalidated on follower metadata replay
(#63612 reverted by #63872).")
+ return
+ }
+
withGlobalLock("cache_last_version_interval_second") {
sql """ADMIN SET ALL FRONTENDS CONFIG
('cache_last_version_interval_second' = '0');"""
sql """ADMIN SET ALL FRONTENDS CONFIG ('sql_cache_manage_num' =
'100000')"""
diff --git
a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
index 477d20dd197..6de2760196b 100644
--- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
@@ -40,6 +40,17 @@ class CanRetryException extends IllegalStateException {
}
suite("parse_sql_from_sql_cache") {
+ // SQL cache invalidation on metadata replay is only reliable on the
master FE.
+ // On a follower/observer FE, a replayed RENAME ROLLUP / MODIFY COLUMN /
ADD PARTITION
+ // does NOT invalidate the local SQL cache (the fix #63612 was reverted by
#63872),
+ // so the assertNoCache checks below would be flaky. Skip unless connected
to the master.
+ def currentFe = sql_return_maparray("SHOW FRONTENDS").find {
it.CurrentConnected == "Yes" }
+ if (currentFe == null || !Boolean.parseBoolean(currentFe.IsMaster)) {
+ logger.info("Skip parse_sql_from_sql_cache: connected FE is not
master, "
+ + "SQL cache is not invalidated on follower metadata replay
(#63612 reverted by #63872).")
+ return
+ }
+
withGlobalLock("cache_last_version_interval_second") {
def assertHasCache = { String sqlStr ->
try {
diff --git
a/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
b/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
index 4b71d29ca53..d3cc9f58ca6 100644
--- a/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
+++ b/regression-test/suites/nereids_syntax_p1/mv/aggregate/agg_sync_mv.groovy
@@ -387,7 +387,7 @@ suite("agg_sync_mv") {
qt_select_stddev_pop """select id, stddev_pop(kint) from agg_mv_test group
by id order by id;"""
sql """drop materialized view if exists mv_sync41 on agg_mv_test;"""
- createMV("""create materialized view mv_sync41 as select id as g3,
stddev_pop(kint) from agg_mv_test group by id order by id;""")
+ createMV("""create materialized view mv_sync41 as select id as g3,
stddev_pop(kint) as g3v from agg_mv_test group by id order by id;""")
mv_rewrite_any_success("select id, stddev_pop(kint) from agg_mv_test group
by id order by id;", ["mv_sync40", "mv_sync41"])
qt_select_stddev_pop_mv """select id, stddev_pop(kint) from agg_mv_test
group by id order by id;"""
@@ -429,7 +429,7 @@ suite("agg_sync_mv") {
qt_select_var_pop """select id, var_pop(kint) from agg_mv_test group by id
order by id;"""
sql """drop materialized view if exists mv_sync48 on agg_mv_test;"""
- createMV("""create materialized view mv_sync48 as select id as s4,
var_pop(kint) from agg_mv_test group by id order by id;""")
+ createMV("""create materialized view mv_sync48 as select id as s4,
var_pop(kint) as s4v from agg_mv_test group by id order by id;""")
mv_rewrite_any_success("select id, var_pop(kint) from agg_mv_test group by
id order by id;", ["mv_sync47", "mv_sync48"])
qt_select_var_pop_mv """select id, var_pop(kint) from agg_mv_test group by
id order by id;"""
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
b/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
index 6a8dc6edf5c..af67ce10af0 100644
---
a/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
+++
b/regression-test/suites/tpch_unique_sql_zstd_bucket1_p1/sql/test_left_anti_join_batch_size.sql
@@ -1,16 +1,16 @@
-- tables: supplier,lineitem,orders,nation
-SELECT /*+SEV_VAR(batch_size=3)*/
+SELECT /*+SET_VAR(batch_size=3)*/
l1.l_orderkey okey,
l1.l_suppkey skey
FROM
- regression_test_tpch_unique_sql_zstd_bucket1_p0.lineitem l1
+ lineitem l1
WHERE
l1.l_receiptdate > l1.l_commitdate
AND l1.L_ORDERKEY < 10000
AND NOT exists(
SELECT *
FROM
- regression_test_tpch_unique_sql_zstd_bucket1_p0.lineitem l3
+ lineitem l3
WHERE
l3.l_orderkey = l1.l_orderkey
AND l3.l_suppkey <> l1.l_suppkey
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]