This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/mempart-leak
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e2e871d71c814de48cc62a661088038ff0c50d37
Author: Hongtao Gao <[email protected]>
AuthorDate: Tue Mar 17 08:27:10 2026 +0000

    fix(measure,stream,trace): release memPart ref on loop close in 
mustAddMemPart
    
    When mustAddMemPart exits early via the loopCloser.CloseNotify() path,
    the introduction's memPart reference was never decremented, causing a
    memory leak. Add decRef() call before returning in that branch.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
---
 CHANGES.md                      |  1 +
 banyand/measure/tstable.go      |  1 +
 banyand/measure/tstable_test.go | 16 ++++++++++++++++
 banyand/stream/tstable.go       |  1 +
 banyand/stream/tstable_test.go  | 16 ++++++++++++++++
 banyand/trace/tstable_test.go   | 16 ++++++++++++++++
 6 files changed, 51 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index 50071ac63..6c7d98d6b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -44,6 +44,7 @@ Release Notes.
 - Fix sidx tag filter range check returning inverted skip decision and use 
correct int64 encoding for block min/max.
 - Ignore take snapshot when no data.
 - Fix measure standalone write handler resetting accumulated groups on error, 
which dropped all successfully processed events in the batch.
+- Fix memory part reference leak in mustAddMemPart when tsTable loop closes.
 
 ### Document
 
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index d213cc989..5358b1634 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -322,6 +322,7 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
        case tst.introductions <- ind:
        case <-tst.loopCloser.CloseNotify():
                tst.addPendingDataCount(-int64(totalCount))
+               ind.memPart.decRef()
                return
        }
        select {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index afb44b918..24eaca361 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -297,6 +297,22 @@ var fieldProjections = map[int][]string{
        3: {"intField"},
 }
 
+func Test_mustAddMemPart_closeNotifyReleasesMemPart(t *testing.T) {
+       tst := &tsTable{
+               loopCloser:    run.NewCloser(1),
+               introductions: make(chan *introduction),
+       }
+
+       mp := generateMemPart()
+       mp.mustInitFromDataPoints(dpsTS1)
+       require.Greater(t, mp.partMetadata.TotalCount, uint64(0))
+
+       tst.Close()
+       tst.mustAddMemPart(mp)
+
+       require.Equal(t, uint64(0), mp.partMetadata.TotalCount)
+}
+
 var dpsTS1 = &dataPoints{
        seriesIDs:  []common.SeriesID{1, 2, 3},
        timestamps: []int64{1, 1, 1},
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index cbd4ce81b..c980a2e9c 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -333,6 +333,7 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
        case tst.introductions <- ind:
        case <-tst.loopCloser.CloseNotify():
                tst.addPendingDataCount(-int64(totalCount))
+               ind.memPart.decRef()
                return
        }
        select {
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index ecdb2758a..0c86050ac 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -250,6 +250,22 @@ func Test_tstIter(t *testing.T) {
        })
 }
 
+func Test_mustAddMemPart_closeNotifyReleasesMemPart(t *testing.T) {
+       tst := &tsTable{
+               loopCloser:    run.NewCloser(1),
+               introductions: make(chan *introduction),
+       }
+
+       mp := generateMemPart()
+       mp.mustInitFromElements(esTS1)
+       require.Greater(t, mp.partMetadata.TotalCount, uint64(0))
+
+       tst.Close()
+       tst.mustAddMemPart(mp)
+
+       require.Equal(t, uint64(0), mp.partMetadata.TotalCount)
+}
+
 var esTS1 = &elements{
        seriesIDs:  []common.SeriesID{1, 2, 3},
        timestamps: []int64{1, 1, 1},
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index d6a3cdf19..1017d6049 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -235,6 +235,22 @@ var testSchemaTagTypes = map[string]pbv1.ValueType{
        "intTag":    pbv1.ValueTypeInt64,
 }
 
+func Test_mustAddMemPart_closeNotifyReleasesMemPart(t *testing.T) {
+       tst := &tsTable{
+               loopCloser:    run.NewCloser(1),
+               introductions: make(chan *introduction),
+       }
+
+       mp := generateMemPart()
+       mp.mustInitFromTraces(tsTS1)
+       require.Greater(t, mp.partMetadata.TotalCount, uint64(0))
+
+       tst.Close()
+       tst.mustAddMemPart(mp, nil)
+
+       require.Equal(t, uint64(0), mp.partMetadata.TotalCount)
+}
+
 var tsTS1 = &traces{
        traceIDs:   []string{"trace1", "trace2", "trace3"},
        timestamps: []int64{1, 1, 1},

Reply via email to