This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e975936d6 fix query segment ref release and chunked sync cleanup 
(#1013)
e975936d6 is described below

commit e975936d684c5c341bf59d8b187d8a4af410bbaf
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Mar 19 17:07:53 2026 +0800

    fix query segment ref release and chunked sync cleanup (#1013)
    
    * fix query segment ref release and chunked sync cleanup
    
    
    ---------
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   1 +
 banyand/measure/measure.go                         |  57 +++++++---
 banyand/measure/measure_suite_test.go              |   2 +-
 banyand/measure/metadata.go                        |  24 ++--
 banyand/measure/metrics.go                         |   1 +
 banyand/measure/query.go                           | 109 ++++++++++++++-----
 banyand/query/processor.go                         |   2 +
 banyand/query/query.go                             |   4 +-
 banyand/queue/sub/chunked_sync.go                  |  21 +++-
 banyand/queue/sub/server.go                        |  42 +------
 banyand/stream/query.go                            |  24 +++-
 banyand/stream/query_by_idx.go                     |   1 +
 banyand/stream/query_by_ts.go                      |   1 +
 banyand/stream/stream_suite_test.go                |   2 +-
 banyand/trace/query.go                             |  42 +++++--
 banyand/trace/trace_suite_test.go                  |   2 +-
 pkg/cmdsetup/data.go                               |   2 +-
 pkg/cmdsetup/standalone.go                         |   2 +-
 pkg/pool/pool.go                                   |  30 -----
 pkg/pool/pool_debug.go                             |  50 +++++++++
 pkg/pool/pool_nop.go                               |  29 +++++
 pkg/pool/tracker.go                                | 121 +++++++++++++++++++++
 pkg/pool/tracker_stub.go                           |  40 +++++++
 .../logical/measure/measure_plan_distributed.go    |  20 +++-
 24 files changed, 477 insertions(+), 152 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index b06b93753..76c802462 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -46,6 +46,7 @@ Release Notes.
 - Fix measure standalone write handler resetting accumulated groups on error, 
which dropped all successfully processed events in the batch.
 - Fix memory part reference leak in mustAddMemPart when tsTable loop closes.
 - Fix memory part leak in syncPartContext Close and prevent double-release in 
FinishSync.
+- Fix segment reference leaks in measure/stream/trace queries and ensure 
chunked sync sessions close part contexts correctly.
 
 ### Document
 
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 29f5abcf1..4e36be0dd 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -26,9 +26,11 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -74,16 +76,17 @@ func (i *indexSchema) parse(schema *databasev1.Measure) {
 }
 
 type measure struct {
-       indexSchema atomic.Value
-       tsdb        atomic.Value
-       c           storage.Cache
-       pm          protector.Memory
-       l           *logger.Logger
-       schema      *databasev1.Measure
-       schemaRepo  *schemaRepo
-       name        string
-       group       string
-       interval    time.Duration
+       indexSchema  atomic.Value
+       tsdb         atomic.Value
+       c            storage.Cache
+       pm           protector.Memory
+       l            *logger.Logger
+       schema       *databasev1.Measure
+       schemaRepo   *schemaRepo
+       queryMetrics *queryMetrics
+       name         string
+       group        string
+       interval     time.Duration
 }
 
 func (m *measure) GetSchema() *databasev1.Measure {
@@ -120,15 +123,37 @@ type measureSpec struct {
        schema *databasev1.Measure
 }
 
+type queryMetrics struct {
+       queryLatency             meter.Histogram
+       queryErrors              meter.Counter
+       resultPoints             meter.Histogram
+       totalQueryResultStarted  meter.Counter
+       totalQueryResultFinished meter.Counter
+}
+
+func newQueryMetrics(factory observability.Factory) *queryMetrics {
+       if factory == nil {
+               return nil
+       }
+       return &queryMetrics{
+               queryLatency:             factory.NewHistogram("query_latency", 
meter.DefBuckets),
+               queryErrors:              
factory.NewCounter("total_query_errors"),
+               resultPoints:             factory.NewHistogram("result_points", 
meter.DefBuckets),
+               totalQueryResultStarted:  
factory.NewCounter("total_query_result_started"),
+               totalQueryResultFinished: 
factory.NewCounter("total_query_result_finished"),
+       }
+}
+
 func openMeasure(spec measureSpec,
-       l *logger.Logger, c storage.Cache, pm protector.Memory, schemaRepo 
*schemaRepo,
+       l *logger.Logger, c storage.Cache, pm protector.Memory, schemaRepo 
*schemaRepo, qm *queryMetrics,
 ) (*measure, error) {
        m := &measure{
-               schema:     spec.schema,
-               l:          l,
-               c:          c,
-               pm:         pm,
-               schemaRepo: schemaRepo,
+               schema:       spec.schema,
+               l:            l,
+               c:            c,
+               pm:           pm,
+               schemaRepo:   schemaRepo,
+               queryMetrics: qm,
        }
        if err := m.parseSpec(); err != nil {
                return nil, err
diff --git a/banyand/measure/measure_suite_test.go 
b/banyand/measure/measure_suite_test.go
index 141d1e015..2f9792e73 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -81,7 +81,7 @@ func setUp() (*services, func()) {
        measureService, err := measure.NewStandalone(metadataService, pipeline, 
nil, metricSvc, pm)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
-       querySvc, err := query.NewService(context.TODO(), nil, measureService, 
nil, metadataService, pipeline)
+       querySvc, err := query.NewService(context.TODO(), nil, measureService, 
nil, metadataService, pipeline, metricSvc)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
        var flags []string
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index fd117050c..1517b8e68 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -24,6 +24,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/cenkalti/backoff/v4"
@@ -615,15 +616,16 @@ func (sr *schemaRepo) 
stopAllProcessorsWithGroupPrefix(groupName string) {
 var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
-       metadata   metadata.Repo
-       omr        observability.MetricsRegistry
-       c          storage.Cache
-       pm         protector.Memory
-       l          *logger.Logger
-       schemaRepo *schemaRepo
-       nodeLabels map[string]string
-       path       string
-       option     option
+       metadata     metadata.Repo
+       omr          observability.MetricsRegistry
+       c            storage.Cache
+       pm           protector.Memory
+       l            *logger.Logger
+       schemaRepo   *schemaRepo
+       nodeLabels   map[string]string
+       queryMetrics atomic.Pointer[queryMetrics]
+       path         string
+       option       option
 }
 
 func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels 
map[string]string) *supplier {
@@ -654,7 +656,7 @@ func (s *supplier) OpenResource(spec 
resourceSchema.Resource) (resourceSchema.In
        measureSchema := spec.Schema().(*databasev1.Measure)
        return openMeasure(measureSpec{
                schema: measureSchema,
-       }, s.l, s.c, s.pm, s.schemaRepo)
+       }, s.l, s.c, s.pm, s.schemaRepo, s.queryMetrics.Load())
 }
 
 func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
@@ -764,7 +766,7 @@ func (s *queueSupplier) OpenResource(spec 
resourceSchema.Resource) (resourceSche
        measureSchema := spec.Schema().(*databasev1.Measure)
        return openMeasure(measureSpec{
                schema: measureSchema,
-       }, s.l, nil, s.pm, s.schemaRepo)
+       }, s.l, nil, s.pm, s.schemaRepo, nil)
 }
 
 func (s *queueSupplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 243522a90..4bb0c172f 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -310,6 +310,7 @@ func (m *metrics) DeleteAll() {
 
 func (s *supplier) newMetrics(p common.Position) (storage.Metrics, 
observability.Factory) {
        factory := 
s.omr.With(measureScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues())))
+       
s.queryMetrics.Store(newQueryMetrics(s.omr.With(measureScope.SubScope("query").ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
 p.DBLabelValues())))))
        return &metrics{
                totalWritten:               factory.NewCounter("total_written"),
                totalBatch:                 factory.NewCounter("total_batch"),
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 18dc09049..794d6e6c6 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -23,6 +23,7 @@ import (
        "context"
        "fmt"
        "sort"
+       "time"
 
        "github.com/pkg/errors"
 
@@ -36,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -48,6 +50,8 @@ const (
 
 var nilResult = model.MeasureQueryResult(nil)
 
+var queryResultTracker = pool.RegisterTracker("measure.queryResult")
+
 // Query allow to retrieve measure data points.
 type Query interface {
        LoadGroup(name string) (resourceSchema.Group, bool)
@@ -77,11 +81,18 @@ type topNQueryOptions struct {
 }
 
 func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) 
(mqr model.MeasureQueryResult, err error) {
-       if mqo.TimeRange == nil {
-               return nil, errors.New("invalid query options: timeRange are 
required")
-       }
-       if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
-               return nil, errors.New("invalid query options: tagProjection or 
fieldProjection is required")
+       startTime := time.Now()
+       defer func() {
+               if m.queryMetrics != nil {
+                       
m.queryMetrics.queryLatency.Observe(time.Since(startTime).Seconds())
+                       if err != nil {
+                               m.queryMetrics.queryErrors.Inc(1)
+                       }
+               }
+       }()
+
+       if validateErr := validateMeasureQueryOptions(mqo); validateErr != nil {
+               return nil, validateErr
        }
 
        var tsdb storage.TSDB[*tsTable, option]
@@ -103,13 +114,23 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
        if len(segments) < 1 {
                return nilResult, nil
        }
+       segmentsNeedRelease := true
+       defer func() {
+               if !segmentsNeedRelease {
+                       return
+               }
+               for i := range segments {
+                       segments[i].DecRef()
+               }
+       }()
 
        if m.schema.IndexMode {
+               segmentsNeedRelease = false
                return m.buildIndexQueryResult(ctx, mqo, segments)
        }
 
-       if len(mqo.Entities) < 1 {
-               return nil, errors.New("invalid query options: series is 
required")
+       if validateErr := validateMeasureEntities(mqo); validateErr != nil {
+               return nil, validateErr
        }
 
        series := make([]*pbv1.Series, len(mqo.Entities))
@@ -125,17 +146,20 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                return nil, err
        }
        if len(sids) < 1 {
-               for i := range segments {
-                       segments[i].DecRef()
-               }
                return nilResult, nil
        }
        result := queryResult{
                ctx:              ctx,
+               qm:               m.queryMetrics,
                segments:         segments,
                tagProjection:    mqo.TagProjection,
                storedIndexValue: storedIndexValue,
        }
+       queryResultTracker.Acquire(&result)
+       if m.queryMetrics != nil {
+               m.queryMetrics.totalQueryResultStarted.Inc(1)
+       }
+       segmentsNeedRelease = false
        defer func() {
                if err != nil {
                        result.Release()
@@ -183,31 +207,53 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                return nil, err
        }
 
+       applyMeasureQueryOrdering(mqo, &result)
+       applyTopNOptions(mqo, &result)
+
+       if m.queryMetrics != nil {
+               m.queryMetrics.resultPoints.Observe(float64(len(result.data)))
+       }
+
+       return &result, nil
+}
+
+func validateMeasureQueryOptions(mqo model.MeasureQueryOptions) error {
+       if mqo.TimeRange == nil {
+               return errors.New("invalid query options: timeRange are 
required")
+       }
+       if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
+               return errors.New("invalid query options: tagProjection or 
fieldProjection is required")
+       }
+       return nil
+}
+
+func validateMeasureEntities(mqo model.MeasureQueryOptions) error {
+       if len(mqo.Entities) < 1 {
+               return errors.New("invalid query options: series is required")
+       }
+       return nil
+}
+
+func applyMeasureQueryOrdering(mqo model.MeasureQueryOptions, result 
*queryResult) {
        if mqo.Order == nil {
                result.ascTS = true
                result.orderByTS = true
-       } else {
-               if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
-                       result.ascTS = true
-               }
-               switch mqo.Order.Type {
-               case index.OrderByTypeTime:
-                       result.orderByTS = true
-               case index.OrderByTypeIndex:
-                       result.orderByTS = false
-               case index.OrderByTypeSeries:
-                       result.orderByTS = false
-               }
+               return
        }
-
-       if mqo.Name == TopNSchemaName {
-               result.topNQueryOptions = &topNQueryOptions{
-                       sortDirection: mqo.Sort,
-                       number:        mqo.Number,
-               }
+       if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
+               result.ascTS = true
        }
+       result.orderByTS = mqo.Order.Type == index.OrderByTypeTime
+}
 
-       return &result, nil
+func applyTopNOptions(mqo model.MeasureQueryOptions, result *queryResult) {
+       if mqo.Name != TopNSchemaName {
+               return
+       }
+       result.topNQueryOptions = &topNQueryOptions{
+               sortDirection: mqo.Sort,
+               number:        mqo.Number,
+       }
 }
 
 type tagNameWithType struct {
@@ -694,6 +740,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue 
{
 
 type queryResult struct {
        ctx              context.Context
+       qm               *queryMetrics
        topNQueryOptions *topNQueryOptions
        sidToIndex       map[common.SeriesID]int
        storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
@@ -780,6 +827,10 @@ func (qr *queryResult) Pull() *model.MeasureResult {
 }
 
 func (qr *queryResult) Release() {
+       queryResultTracker.Release(qr)
+       if qr.qm != nil {
+               qr.qm.totalQueryResultFinished.Inc(1)
+       }
        for i, v := range qr.data {
                releaseBlockCursor(v)
                qr.data[i] = nil
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index c6efbbda2..4857eee4e 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -33,6 +33,7 @@ import (
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/banyand/trace"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -432,6 +433,7 @@ type measureInternalQueryProcessor struct {
        measureService measure.Service
        *queryService
        *bus.UnImplementedHealthyListener
+       metricSvc observability.MetricsRegistry
 }
 
 func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message 
bus.Message) (resp bus.Message) {
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 4985f47ea..8d5a25ae9 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/banyand/trace"
@@ -51,7 +52,7 @@ type queryService struct {
 
 // NewService return a new query service.
 func NewService(_ context.Context, streamService stream.Service, 
measureService measure.Service, traceService trace.Service,
-       metaService metadata.Repo, pipeline queue.Server,
+       metaService metadata.Repo, pipeline queue.Server, metricSvc 
observability.MetricsRegistry,
 ) (run.Unit, error) {
        svc := &queryService{
                metaService: metaService,
@@ -66,6 +67,7 @@ func NewService(_ context.Context, streamService 
stream.Service, measureService
        svc.imqp = &measureInternalQueryProcessor{
                measureService: measureService,
                queryService:   svc,
+               metricSvc:      metricSvc,
        }
        // stream query processor
        svc.sqp = &streamQueryProcessor{
diff --git a/banyand/queue/sub/chunked_sync.go 
b/banyand/queue/sub/chunked_sync.go
index 63d6137fa..07fa6eafe 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -128,9 +128,10 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
        var sessionID string
        defer func() {
                if currentSession != nil {
-                       s.unregisterSession(currentSession.sessionID)
                        if currentSession.partCtx != nil {
-                               currentSession.partCtx.Close()
+                               if closeErr := currentSession.partCtx.Close(); 
closeErr != nil {
+                                       
s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close session partCtx")
+                               }
                        }
                }
        }()
@@ -154,6 +155,21 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
                sessionID = req.SessionId
 
                if req.GetMetadata() != nil {
+                       if currentSession != nil {
+                               if currentSession.partCtx != nil {
+                                       if currentSession.partCtx.Handler != 
nil {
+                                               if finishErr := 
currentSession.partCtx.Handler.FinishSync(); finishErr != nil {
+                                                       
s.updateChunkOrderMetrics("finish_sync_err", currentSession.sessionID)
+                                                       
s.log.Error().Err(finishErr).Str("session_id", 
currentSession.sessionID).Msg("failed to finish sync for previous session")
+                                               }
+                                               if closeErr := 
currentSession.partCtx.Close(); closeErr != nil {
+                                                       
s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close previous session partCtx")
+                                               }
+                                       } else if closeErr := 
currentSession.partCtx.Close(); closeErr != nil {
+                                               
s.log.Error().Err(closeErr).Str("session_id", 
currentSession.sessionID).Msg("failed to close previous session partCtx")
+                                       }
+                               }
+                       }
                        currentSession = &syncSession{
                                sessionID:      sessionID,
                                metadata:       req.GetMetadata(),
@@ -161,7 +177,6 @@ func (s *server) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) er
                                chunksReceived: 0,
                                partsProgress:  make(map[int]*partProgress),
                        }
-                       s.registerSession(sessionID, currentSession)
                        if dl := s.log.Debug(); dl.Enabled() {
                                dl.Str("session_id", sessionID).
                                        Str("topic", req.GetMetadata().Topic).
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 69da73ffc..7f46ff08a 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -85,7 +85,6 @@ type server struct {
        listeners             map[bus.Topic][]bus.MessageListener
        topicMap              map[string]bus.Topic
        chunkedSyncHandlers   map[bus.Topic]queue.ChunkedSyncHandler
-       activeSessions        map[string]*syncSession
        log                   *logger.Logger
        httpSrv               *http.Server
        tlsReloader           *pkgtls.Reloader
@@ -101,7 +100,6 @@ type server struct {
        maxRecvMsgSize        run.Bytes
        listenersLock         sync.RWMutex
        routeTableProviderMu  sync.RWMutex
-       activeSessionsMu      sync.Mutex
        port                  uint32
        httpPort              uint32
        maxChunkBufferSize    uint32
@@ -121,7 +119,6 @@ func NewServerWithPorts(omr observability.MetricsRegistry, 
flagNamePrefix string
                listeners:           make(map[bus.Topic][]bus.MessageListener),
                topicMap:            make(map[string]bus.Topic),
                chunkedSyncHandlers: 
make(map[bus.Topic]queue.ChunkedSyncHandler),
-               activeSessions:      make(map[string]*syncSession),
                omr:                 omr,
                maxRecvMsgSize:      defaultRecvSize,
                flagNamePrefix:      flagNamePrefix,
@@ -381,39 +378,6 @@ func (s *server) GracefulStop() {
                t.Stop()
                s.log.Info().Msg("stopped gracefully")
        }
-
-       s.closeAllSessions()
-}
-
-// registerSession adds a session to the active sessions map.
-func (s *server) registerSession(id string, session *syncSession) {
-       s.activeSessionsMu.Lock()
-       s.activeSessions[id] = session
-       s.activeSessionsMu.Unlock()
-}
-
-// unregisterSession removes a session from the active sessions map.
-func (s *server) unregisterSession(id string) {
-       s.activeSessionsMu.Lock()
-       delete(s.activeSessions, id)
-       s.activeSessionsMu.Unlock()
-}
-
-// closeAllSessions closes the partCtx of every remaining active session.
-// It is called after the gRPC server has fully stopped as a safety net.
-func (s *server) closeAllSessions() {
-       s.activeSessionsMu.Lock()
-       sessions := s.activeSessions
-       s.activeSessions = make(map[string]*syncSession)
-       s.activeSessionsMu.Unlock()
-
-       for id, session := range sessions {
-               if session.partCtx != nil {
-                       if closeErr := session.partCtx.Close(); closeErr != nil 
{
-                               s.log.Error().Err(closeErr).Str("session_id", 
id).Msg("failed to close session partCtx during shutdown")
-                       }
-               }
-       }
 }
 
 // RegisterChunkedSyncHandler implements queue.Server.
@@ -444,6 +408,7 @@ type metrics struct {
        bufferTimeouts           meter.Counter
        largeGapsRejected        meter.Counter
        bufferCapacityExceeded   meter.Counter
+       finishSyncErr            meter.Counter
 }
 
 func newMetrics(factory observability.Factory) *metrics {
@@ -463,11 +428,12 @@ func newMetrics(factory observability.Factory) *metrics {
                bufferTimeouts:           factory.NewCounter("buffer_timeouts", 
"session_id"),
                largeGapsRejected:        
factory.NewCounter("large_gaps_rejected", "session_id"),
                bufferCapacityExceeded:   
factory.NewCounter("buffer_capacity_exceeded", "session_id"),
+               finishSyncErr:            factory.NewCounter("finish_sync_err", 
"session_id"),
        }
 }
 
 // updateChunkOrderMetrics updates chunk ordering metrics.
-func (s *server) updateChunkOrderMetrics(event string, sessionID string) {
+func (s *server) updateChunkOrderMetrics(event, sessionID string) {
        if s.metrics == nil {
                return // Skip metrics if not initialized (e.g., during tests)
        }
@@ -482,5 +448,7 @@ func (s *server) updateChunkOrderMetrics(event string, 
sessionID string) {
                s.metrics.largeGapsRejected.Inc(1, sessionID)
        case "buffer_full":
                s.metrics.bufferCapacityExceeded.Inc(1, sessionID)
+       case "finish_sync_err":
+               s.metrics.finishSyncErr.Inc(1, sessionID)
        }
 }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index dfa8e5472..bbcccbaf7 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -35,11 +35,14 @@ import (
        itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+var streamQueryResultTracker = pool.RegisterTracker("stream.queryResult")
+
 const checkDoneEvery = 128
 
 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr 
model.StreamQueryResult, err error) {
@@ -60,9 +63,13 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                return bypassQueryResultInstance, nil
        }
 
+       segmentsNeedRelease := true
        defer func() {
-               if err != nil {
-                       sqr.Release()
+               if !segmentsNeedRelease {
+                       return
+               }
+               for i := range segments {
+                       segments[i].DecRef()
                }
        }()
 
@@ -82,10 +89,17 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
        tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true, 
true)
 
        if sqo.Order == nil || sqo.Order.Index == nil {
-               return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil
+               sqr = s.executeTimeSeriesQuery(segments, series, qo, &tr)
+               segmentsNeedRelease = false
+               return sqr, nil
        }
 
-       return s.executeIndexedQuery(ctx, segments, series, sqo, 
schemaTagTypes, &tr)
+       sqr, err = s.executeIndexedQuery(ctx, segments, series, sqo, 
schemaTagTypes, &tr)
+       if err != nil {
+               return nil, err
+       }
+       segmentsNeedRelease = false
+       return sqr, nil
 }
 
 func validateQueryInput(sqo model.StreamQueryOptions) error {
@@ -157,6 +171,7 @@ func (s *stream) executeTimeSeriesQuery(
                result.asc = true
        }
 
+       streamQueryResultTracker.Acquire(result)
        return result
 }
 
@@ -198,6 +213,7 @@ func (s *stream) executeIndexedQuery(
                result.asc = true
        }
 
+       streamQueryResultTracker.Acquire(&result)
        return &result, nil
 }
 
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index 242a76e9d..09e96316d 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -277,6 +277,7 @@ func (qr *idxResult) releaseBlockCursor() {
 }
 
 func (qr *idxResult) Release() {
+       streamQueryResultTracker.Release(qr)
        qr.releaseParts()
        for i := range qr.segments {
                qr.segments[i].DecRef()
diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go
index 5674cd1cf..7c58e6fd8 100644
--- a/banyand/stream/query_by_ts.go
+++ b/banyand/stream/query_by_ts.go
@@ -192,6 +192,7 @@ func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo 
queryOptions, is index
 }
 
 func (t *tsResult) Release() {
+       streamQueryResultTracker.Release(t)
        if t.ts != nil {
                t.ts.close()
        }
diff --git a/banyand/stream/stream_suite_test.go 
b/banyand/stream/stream_suite_test.go
index 0ca524649..853bb5578 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -82,7 +82,7 @@ func setUp() (*services, func()) {
        streamService, err := stream.NewService(metadataService, pipeline, 
metricSvc, pm, nil)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadStreamSvc := &preloadStreamService{metaSvc: metadataService}
-       querySvc, err := query.NewService(context.TODO(), streamService, nil, 
nil, metadataService, pipeline)
+       querySvc, err := query.NewService(context.TODO(), streamService, nil, 
nil, metadataService, pipeline, metricSvc)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 5fdbb6d55..801b70872 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -34,9 +34,12 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
+var traceQueryResultTracker = pool.RegisterTracker("trace.queryResult")
+
 const (
        checkDoneEvery = 128
        queryTimeout   = 20 * time.Second
@@ -69,12 +72,22 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
        if len(segments) < 1 {
                return nilResult, nil
        }
+       segmentsNeedRelease := true
+       defer func() {
+               if !segmentsNeedRelease {
+                       return
+               }
+               for i := range segments {
+                       segments[i].DecRef()
+               }
+       }()
 
        result := queryResult{
                ctx:           ctx,
                segments:      segments,
                tagProjection: tqo.TagProjection,
        }
+       segmentsNeedRelease = false
        defer func() {
                if err != nil {
                        result.Release()
@@ -105,6 +118,7 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
 
        sidxInstances, sidxQueryRequest, useSIDXStreaming := 
t.prepareSIDXStreaming(tqo, qo, tables)
        if len(qo.traceIDs) == 0 && !useSIDXStreaming {
+               result.Release()
                return nilResult, nil
        }
 
@@ -131,6 +145,7 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
 
        result.cursorBatchCh = t.startBlockScanStage(pipelineCtx, tables, qo, 
traceBatchCh)
 
+       traceQueryResultTracker.Acquire(&result)
        return &result, nil
 }
 
@@ -484,6 +499,7 @@ func (qr *queryResult) releaseCurrentBatch() {
 }
 
 func (qr *queryResult) Release() {
+       traceQueryResultTracker.Release(qr)
        if qr.cancel != nil {
                qr.cancel()
        }
@@ -494,23 +510,25 @@ func (qr *queryResult) Release() {
        }
 
        // Drain all batches and their cursor channels to ensure 
scanTraceIDsInline completes
-       for batch := range qr.cursorBatchCh {
-               if batch != nil {
-                       if batch.cursorCh != nil {
-                               // Drain the cursor channel to ensure 
scanTraceIDsInline goroutine finishes
-                               for result := range batch.cursorCh {
-                                       if result.cursor != nil {
-                                               
releaseBlockCursor(result.cursor)
+       if qr.cursorBatchCh != nil {
+               for batch := range qr.cursorBatchCh {
+                       if batch != nil {
+                               if batch.cursorCh != nil {
+                                       // Drain the cursor channel to ensure 
scanTraceIDsInline goroutine finishes
+                                       for result := range batch.cursorCh {
+                                               if result.cursor != nil {
+                                                       
releaseBlockCursor(result.cursor)
+                                               }
                                        }
                                }
-                       }
-                       // Release snapshots from drained batches
-                       for _, s := range batch.snapshots {
-                               s.decRef()
+                               // Release snapshots from drained batches
+                               for _, s := range batch.snapshots {
+                                       s.decRef()
+                               }
                        }
                }
+               qr.cursorBatchCh = nil
        }
-       qr.cursorBatchCh = nil
 
        qr.releaseCurrentBatch()
 
diff --git a/banyand/trace/trace_suite_test.go 
b/banyand/trace/trace_suite_test.go
index 071049b20..daefee2bd 100644
--- a/banyand/trace/trace_suite_test.go
+++ b/banyand/trace/trace_suite_test.go
@@ -82,7 +82,7 @@ func setUp() (*services, func()) {
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadTraceSvc := &preloadTraceService{metaSvc: metadataService}
        // Init Query Service for trace queries
-       querySvc, err := query.NewService(context.TODO(), nil, nil, 
traceService, metadataService, pipeline)
+       querySvc, err := query.NewService(context.TODO(), nil, nil, 
traceService, metadataService, pipeline, metricSvc)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 243e904f0..04f27fadd 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -76,7 +76,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate trace service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, 
metaSvc, pipeline)
+       q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, 
metaSvc, pipeline, metricSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 7ebdfab10..d8fdd4afc 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -73,7 +73,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, 
metaSvc, dataPipeline)
+       q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, 
metaSvc, dataPipeline, metricSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go
index c2b275fd8..d928032ba 100644
--- a/pkg/pool/pool.go
+++ b/pkg/pool/pool.go
@@ -30,11 +30,6 @@ var (
        stackTrackingEnabled atomic.Bool
 )
 
-// EnableStackTracking enables or disables stack tracking for all pools.
-func EnableStackTracking(enabled bool) {
-       stackTrackingEnabled.Store(enabled)
-}
-
 // Register registers a new pool with the given name.
 func Register[T any](name string) *Synced[T] {
        p := new(Synced[T])
@@ -44,31 +39,6 @@ func Register[T any](name string) *Synced[T] {
        return p
 }
 
-// AllRefsCount returns the reference count of all pools.
-func AllRefsCount() map[string]int {
-       result := make(map[string]int)
-       poolMap.Range(func(key, value any) bool {
-               result[key.(string)] = value.(Trackable).RefsCount()
-               return true
-       })
-       return result
-}
-
-// AllStacks returns all recorded stack traces for leaked objects from all 
pools.
-func AllStacks() map[string][]string {
-       result := make(map[string][]string)
-       poolMap.Range(func(key, value any) bool {
-               if st, ok := value.(StackTracker); ok {
-                       stacks := st.Stacks()
-                       if len(stacks) > 0 {
-                               result[key.(string)] = stacks
-                       }
-               }
-               return true
-       })
-       return result
-}
-
 // Trackable is the interface that wraps the RefsCount method.
 type Trackable interface {
        // RefsCount returns the reference count of the pool.
diff --git a/pkg/pool/pool_debug.go b/pkg/pool/pool_debug.go
new file mode 100644
index 000000000..dd62b02b2
--- /dev/null
+++ b/pkg/pool/pool_debug.go
@@ -0,0 +1,50 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build !slim
+
+package pool
+
+// EnableStackTracking enables or disables stack tracking for all pools.
+func EnableStackTracking(enabled bool) {
+       stackTrackingEnabled.Store(enabled)
+}
+
+// AllRefsCount returns the reference count of all pools.
+func AllRefsCount() map[string]int {
+       result := make(map[string]int)
+       poolMap.Range(func(key, value any) bool {
+               result[key.(string)] = value.(Trackable).RefsCount()
+               return true
+       })
+       return result
+}
+
+// AllStacks returns all recorded stack traces for leaked objects from all 
pools.
+func AllStacks() map[string][]string {
+       result := make(map[string][]string)
+       poolMap.Range(func(key, value any) bool {
+               if st, ok := value.(StackTracker); ok {
+                       stacks := st.Stacks()
+                       if len(stacks) > 0 {
+                               result[key.(string)] = stacks
+                       }
+               }
+               return true
+       })
+       return result
+}
diff --git a/pkg/pool/pool_nop.go b/pkg/pool/pool_nop.go
new file mode 100644
index 000000000..6aeb1bd62
--- /dev/null
+++ b/pkg/pool/pool_nop.go
@@ -0,0 +1,29 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build slim
+
+package pool
+
+// EnableStackTracking is a no-op in slim builds.
+func EnableStackTracking(enabled bool) {}
+
+// AllRefsCount returns nil in slim builds.
+func AllRefsCount() map[string]int { return nil }
+
+// AllStacks returns nil in slim builds.
+func AllStacks() map[string][]string { return nil }
diff --git a/pkg/pool/tracker.go b/pkg/pool/tracker.go
new file mode 100644
index 000000000..5a31f1ea4
--- /dev/null
+++ b/pkg/pool/tracker.go
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build !slim
+
+package pool
+
+import (
+       "fmt"
+       "runtime"
+       "sync"
+       "sync/atomic"
+)
+
+// Tracker tracks object lifecycle (Acquire/Release) without pooling.
+// It integrates with AllRefsCount/AllStacks to help detect leaked resources.
+type Tracker struct {
+       stacks     map[uint64]string
+       idMap      map[any]uint64
+       active     sync.Map
+       idCounter  atomic.Uint64
+       stacksLock sync.Mutex
+       refs       atomic.Int32
+}
+
+// RegisterTracker registers a new lifecycle tracker with the given name.
+func RegisterTracker(name string) *Tracker {
+       t := new(Tracker)
+       if _, ok := poolMap.LoadOrStore(name, t); ok {
+               panic(fmt.Sprintf("duplicated tracker: %s", name))
+       }
+       return t
+}
+
+// Acquire marks an object as acquired.
+func (t *Tracker) Acquire(obj any) {
+       if obj == nil {
+               return
+       }
+       if _, loaded := t.active.LoadOrStore(obj, struct{}{}); loaded {
+               // Double acquire is a logic bug, but avoid inflating refs.
+               return
+       }
+       t.refs.Add(1)
+
+       if !stackTrackingEnabled.Load() {
+               return
+       }
+
+       t.stacksLock.Lock()
+       if t.stacks == nil {
+               t.stacks = make(map[uint64]string)
+               t.idMap = make(map[any]uint64)
+       }
+       id := t.idCounter.Add(1)
+       buf := make([]byte, 4096)
+       n := runtime.Stack(buf, false)
+       t.idMap[obj] = id
+       t.stacks[id] = "Tracker.Acquire() called:\n" + string(buf[:n])
+       t.stacksLock.Unlock()
+}
+
+// Release marks an object as released.
+func (t *Tracker) Release(obj any) {
+       if obj == nil {
+               return
+       }
+       if _, loaded := t.active.LoadAndDelete(obj); !loaded {
+               // Avoid negative refs on double-release or releasing an 
untracked object.
+               return
+       }
+       t.refs.Add(-1)
+
+       if !stackTrackingEnabled.Load() {
+               return
+       }
+
+       t.stacksLock.Lock()
+       if t.idMap != nil {
+               if id, ok := t.idMap[obj]; ok {
+                       delete(t.stacks, id)
+                       delete(t.idMap, obj)
+               }
+       }
+       t.stacksLock.Unlock()
+}
+
+// RefsCount returns the reference count of tracked objects.
+func (t *Tracker) RefsCount() int {
+       return int(t.refs.Load())
+}
+
+// Stacks returns recorded stack traces for acquired-but-not-released objects.
+func (t *Tracker) Stacks() []string {
+       t.stacksLock.Lock()
+       defer t.stacksLock.Unlock()
+
+       if t.stacks == nil {
+               return nil
+       }
+
+       result := make([]string, 0, len(t.stacks))
+       for _, stack := range t.stacks {
+               result = append(result, stack)
+       }
+       return result
+}
diff --git a/pkg/pool/tracker_stub.go b/pkg/pool/tracker_stub.go
new file mode 100644
index 000000000..6ba94dda4
--- /dev/null
+++ b/pkg/pool/tracker_stub.go
@@ -0,0 +1,40 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build slim
+
+package pool
+
+// Tracker is a no-op stub in slim builds.
+type Tracker struct{}
+
+// Acquire is a no-op in slim builds.
+func (t *Tracker) Acquire(any) {}
+
+// Release is a no-op in slim builds.
+func (t *Tracker) Release(any) {}
+
+// RefsCount returns 0 in slim builds.
+func (t *Tracker) RefsCount() int { return 0 }
+
+// Stacks returns nil in slim builds.
+func (t *Tracker) Stacks() []string { return nil }
+
+// RegisterTracker returns a no-op tracker.
+func RegisterTracker(name string) *Tracker {
+       return new(Tracker)
+}
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 69fce0659..d229d99e6 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -429,6 +429,7 @@ type sortedMIterator struct {
        uniqueData  map[uint64]*measurev1.InternalDataPoint
        cur         *measurev1.InternalDataPoint
        initialized bool
+       exhausted   bool
        closed      bool
 }
 
@@ -438,7 +439,7 @@ func (s *sortedMIterator) init() {
        }
        s.initialized = true
        if !s.Iterator.Next() {
-               s.closed = true
+               s.exhausted = true
                return
        }
        s.data = list.New()
@@ -463,7 +464,7 @@ func (s *sortedMIterator) Next() bool {
 }
 
 func (s *sortedMIterator) loadDps() {
-       if s.closed {
+       if s.exhausted {
                return
        }
        for k := range s.uniqueData {
@@ -473,7 +474,7 @@ func (s *sortedMIterator) loadDps() {
        s.uniqueData[hashDataPoint(first.GetDataPoint())] = 
first.InternalDataPoint
        for {
                if !s.Iterator.Next() {
-                       s.closed = true
+                       s.exhausted = true
                        break
                }
                v := s.Iterator.Val()
@@ -500,7 +501,18 @@ func (s *sortedMIterator) Current() 
[]*measurev1.InternalDataPoint {
 }
 
 func (s *sortedMIterator) Close() error {
-       return nil
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       s.exhausted = true
+       s.data = nil
+       s.uniqueData = nil
+       s.cur = nil
+       if s.Iterator == nil {
+               return nil
+       }
+       return s.Iterator.Close()
 }
 
 const (


Reply via email to