This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/mempart-leak in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e2e871d71c814de48cc62a661088038ff0c50d37 Author: Hongtao Gao <[email protected]> AuthorDate: Tue Mar 17 08:27:10 2026 +0000 fix(measure,stream,trace): release memPart ref on loop close in mustAddMemPart When mustAddMemPart exits early via the loopCloser.CloseNotify() path, the introduction's memPart reference was never decremented, causing a memory leak. Add decRef() call before returning in that branch. Co-Authored-By: Claude Sonnet 4.6 <[email protected]> --- CHANGES.md | 1 + banyand/measure/tstable.go | 1 + banyand/measure/tstable_test.go | 16 ++++++++++++++++ banyand/stream/tstable.go | 1 + banyand/stream/tstable_test.go | 16 ++++++++++++++++ banyand/trace/tstable_test.go | 16 ++++++++++++++++ 6 files changed, 51 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 50071ac63..6c7d98d6b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -44,6 +44,7 @@ Release Notes. - Fix sidx tag filter range check returning inverted skip decision and use correct int64 encoding for block min/max. - Ignore take snapshot when no data. - Fix measure standalone write handler resetting accumulated groups on error, which dropped all successfully processed events in the batch. +- Fix memory part reference leak in mustAddMemPart when tsTable loop closes. ### Document diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index d213cc989..5358b1634 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -322,6 +322,7 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) { case tst.introductions <- ind: case <-tst.loopCloser.CloseNotify(): tst.addPendingDataCount(-int64(totalCount)) + ind.memPart.decRef() return } select { diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index afb44b918..24eaca361 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -297,6 +297,22 @@ var fieldProjections = map[int][]string{ 3: {"intField"}, } +func Test_mustAddMemPart_closeNotifyReleasesMemPart(t *testing.T) { + tst := &tsTable{ + loopCloser: run.NewCloser(1), + introductions: make(chan *introduction), + } + + mp := generateMemPart() + mp.mustInitFromDataPoints(dpsTS1) + require.Greater(t, mp.partMetadata.TotalCount, uint64(0)) + + tst.Close() + tst.mustAddMemPart(mp) + + require.Equal(t, uint64(0), mp.partMetadata.TotalCount) +} + var dpsTS1 = &dataPoints{ seriesIDs: []common.SeriesID{1, 2, 3}, timestamps: []int64{1, 1, 1}, diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index cbd4ce81b..c980a2e9c 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -333,6 +333,7 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) { case tst.introductions <- ind: case <-tst.loopCloser.CloseNotify(): tst.addPendingDataCount(-int64(totalCount)) + ind.memPart.decRef() return } select { diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go index ecdb2758a..0c86050ac 100644 --- a/banyand/stream/tstable_test.go +++ b/banyand/stream/tstable_test.go @@ -250,6 +250,22 @@ func Test_tstIter(t *testing.T) { }) } +func Test_mustAddMemPart_closeNotifyReleasesMemPart(t *testing.T) { + tst := &tsTable{ + loopCloser: run.NewCloser(1), + introductions: make(chan *introduction), + } + + mp := generateMemPart() + mp.mustInitFromElements(esTS1) + require.Greater(t, mp.partMetadata.TotalCount, uint64(0)) + + tst.Close() + tst.mustAddMemPart(mp) + + require.Equal(t, uint64(0), mp.partMetadata.TotalCount) +} + var esTS1 = &elements{ seriesIDs: []common.SeriesID{1, 2, 3}, timestamps: []int64{1, 1, 1}, diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go index d6a3cdf19..1017d6049 100644 --- a/banyand/trace/tstable_test.go +++ b/banyand/trace/tstable_test.go @@ -235,6 +235,22 @@ var testSchemaTagTypes = map[string]pbv1.ValueType{ "intTag": pbv1.ValueTypeInt64, } +func Test_mustAddMemPart_closeNotifyReleasesMemPart(t *testing.T) { + tst := &tsTable{ + loopCloser: run.NewCloser(1), + introductions: make(chan *introduction), + } + + mp := generateMemPart() + mp.mustInitFromTraces(tsTS1) + require.Greater(t, mp.partMetadata.TotalCount, uint64(0)) + + tst.Close() + tst.mustAddMemPart(mp, nil) + + require.Equal(t, uint64(0), mp.partMetadata.TotalCount) +} + var tsTS1 = &traces{ traceIDs: []string{"trace1", "trace2", "trace3"}, timestamps: []int64{1, 1, 1},
