This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e975936d6 fix query segment ref release and chunked sync cleanup
(#1013)
e975936d6 is described below
commit e975936d684c5c341bf59d8b187d8a4af410bbaf
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Mar 19 17:07:53 2026 +0800
fix query segment ref release and chunked sync cleanup (#1013)
* fix query segment ref release and chunked sync cleanup
---------
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
banyand/measure/measure.go | 57 +++++++---
banyand/measure/measure_suite_test.go | 2 +-
banyand/measure/metadata.go | 24 ++--
banyand/measure/metrics.go | 1 +
banyand/measure/query.go | 109 ++++++++++++++-----
banyand/query/processor.go | 2 +
banyand/query/query.go | 4 +-
banyand/queue/sub/chunked_sync.go | 21 +++-
banyand/queue/sub/server.go | 42 +------
banyand/stream/query.go | 24 +++-
banyand/stream/query_by_idx.go | 1 +
banyand/stream/query_by_ts.go | 1 +
banyand/stream/stream_suite_test.go | 2 +-
banyand/trace/query.go | 42 +++++--
banyand/trace/trace_suite_test.go | 2 +-
pkg/cmdsetup/data.go | 2 +-
pkg/cmdsetup/standalone.go | 2 +-
pkg/pool/pool.go | 30 -----
pkg/pool/pool_debug.go | 50 +++++++++
pkg/pool/pool_nop.go | 29 +++++
pkg/pool/tracker.go | 121 +++++++++++++++++++++
pkg/pool/tracker_stub.go | 40 +++++++
.../logical/measure/measure_plan_distributed.go | 20 +++-
24 files changed, 477 insertions(+), 152 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index b06b93753..76c802462 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -46,6 +46,7 @@ Release Notes.
- Fix measure standalone write handler resetting accumulated groups on error,
which dropped all successfully processed events in the batch.
- Fix memory part reference leak in mustAddMemPart when tsTable loop closes.
- Fix memory part leak in syncPartContext Close and prevent double-release in
FinishSync.
+- Fix segment reference leaks in measure/stream/trace queries and ensure
chunked sync sessions close part contexts correctly.
### Document
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 29f5abcf1..4e36be0dd 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -26,9 +26,11 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -74,16 +76,17 @@ func (i *indexSchema) parse(schema *databasev1.Measure) {
}
type measure struct {
- indexSchema atomic.Value
- tsdb atomic.Value
- c storage.Cache
- pm protector.Memory
- l *logger.Logger
- schema *databasev1.Measure
- schemaRepo *schemaRepo
- name string
- group string
- interval time.Duration
+ indexSchema atomic.Value
+ tsdb atomic.Value
+ c storage.Cache
+ pm protector.Memory
+ l *logger.Logger
+ schema *databasev1.Measure
+ schemaRepo *schemaRepo
+ queryMetrics *queryMetrics
+ name string
+ group string
+ interval time.Duration
}
func (m *measure) GetSchema() *databasev1.Measure {
@@ -120,15 +123,37 @@ type measureSpec struct {
schema *databasev1.Measure
}
+type queryMetrics struct {
+ queryLatency meter.Histogram
+ queryErrors meter.Counter
+ resultPoints meter.Histogram
+ totalQueryResultStarted meter.Counter
+ totalQueryResultFinished meter.Counter
+}
+
+func newQueryMetrics(factory observability.Factory) *queryMetrics {
+ if factory == nil {
+ return nil
+ }
+ return &queryMetrics{
+ queryLatency: factory.NewHistogram("query_latency",
meter.DefBuckets),
+ queryErrors:
factory.NewCounter("total_query_errors"),
+ resultPoints: factory.NewHistogram("result_points",
meter.DefBuckets),
+ totalQueryResultStarted:
factory.NewCounter("total_query_result_started"),
+ totalQueryResultFinished:
factory.NewCounter("total_query_result_finished"),
+ }
+}
+
func openMeasure(spec measureSpec,
- l *logger.Logger, c storage.Cache, pm protector.Memory, schemaRepo
*schemaRepo,
+ l *logger.Logger, c storage.Cache, pm protector.Memory, schemaRepo
*schemaRepo, qm *queryMetrics,
) (*measure, error) {
m := &measure{
- schema: spec.schema,
- l: l,
- c: c,
- pm: pm,
- schemaRepo: schemaRepo,
+ schema: spec.schema,
+ l: l,
+ c: c,
+ pm: pm,
+ schemaRepo: schemaRepo,
+ queryMetrics: qm,
}
if err := m.parseSpec(); err != nil {
return nil, err
diff --git a/banyand/measure/measure_suite_test.go
b/banyand/measure/measure_suite_test.go
index 141d1e015..2f9792e73 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -81,7 +81,7 @@ func setUp() (*services, func()) {
measureService, err := measure.NewStandalone(metadataService, pipeline,
nil, metricSvc, pm)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
- querySvc, err := query.NewService(context.TODO(), nil, measureService,
nil, metadataService, pipeline)
+ querySvc, err := query.NewService(context.TODO(), nil, measureService,
nil, metadataService, pipeline, metricSvc)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var flags []string
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index fd117050c..1517b8e68 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -24,6 +24,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
@@ -615,15 +616,16 @@ func (sr *schemaRepo)
stopAllProcessorsWithGroupPrefix(groupName string) {
var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
type supplier struct {
- metadata metadata.Repo
- omr observability.MetricsRegistry
- c storage.Cache
- pm protector.Memory
- l *logger.Logger
- schemaRepo *schemaRepo
- nodeLabels map[string]string
- path string
- option option
+ metadata metadata.Repo
+ omr observability.MetricsRegistry
+ c storage.Cache
+ pm protector.Memory
+ l *logger.Logger
+ schemaRepo *schemaRepo
+ nodeLabels map[string]string
+ queryMetrics atomic.Pointer[queryMetrics]
+ path string
+ option option
}
func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels
map[string]string) *supplier {
@@ -654,7 +656,7 @@ func (s *supplier) OpenResource(spec
resourceSchema.Resource) (resourceSchema.In
measureSchema := spec.Schema().(*databasev1.Measure)
return openMeasure(measureSpec{
schema: measureSchema,
- }, s.l, s.c, s.pm, s.schemaRepo)
+ }, s.l, s.c, s.pm, s.schemaRepo, s.queryMetrics.Load())
}
func (s *supplier) ResourceSchema(md *commonv1.Metadata)
(resourceSchema.ResourceSchema, error) {
@@ -764,7 +766,7 @@ func (s *queueSupplier) OpenResource(spec
resourceSchema.Resource) (resourceSche
measureSchema := spec.Schema().(*databasev1.Measure)
return openMeasure(measureSpec{
schema: measureSchema,
- }, s.l, nil, s.pm, s.schemaRepo)
+ }, s.l, nil, s.pm, s.schemaRepo, nil)
}
func (s *queueSupplier) ResourceSchema(md *commonv1.Metadata)
(resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 243522a90..4bb0c172f 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -310,6 +310,7 @@ func (m *metrics) DeleteAll() {
func (s *supplier) newMetrics(p common.Position) (storage.Metrics,
observability.Factory) {
factory :=
s.omr.With(measureScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues())))
+
s.queryMetrics.Store(newQueryMetrics(s.omr.With(measureScope.SubScope("query").ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues())))))
return &metrics{
totalWritten: factory.NewCounter("total_written"),
totalBatch: factory.NewCounter("total_batch"),
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 18dc09049..794d6e6c6 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"sort"
+ "time"
"github.com/pkg/errors"
@@ -36,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -48,6 +50,8 @@ const (
var nilResult = model.MeasureQueryResult(nil)
+var queryResultTracker = pool.RegisterTracker("measure.queryResult")
+
// Query allow to retrieve measure data points.
type Query interface {
LoadGroup(name string) (resourceSchema.Group, bool)
@@ -77,11 +81,18 @@ type topNQueryOptions struct {
}
func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions)
(mqr model.MeasureQueryResult, err error) {
- if mqo.TimeRange == nil {
- return nil, errors.New("invalid query options: timeRange are
required")
- }
- if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
- return nil, errors.New("invalid query options: tagProjection or
fieldProjection is required")
+ startTime := time.Now()
+ defer func() {
+ if m.queryMetrics != nil {
+
m.queryMetrics.queryLatency.Observe(time.Since(startTime).Seconds())
+ if err != nil {
+ m.queryMetrics.queryErrors.Inc(1)
+ }
+ }
+ }()
+
+ if validateErr := validateMeasureQueryOptions(mqo); validateErr != nil {
+ return nil, validateErr
}
var tsdb storage.TSDB[*tsTable, option]
@@ -103,13 +114,23 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
if len(segments) < 1 {
return nilResult, nil
}
+ segmentsNeedRelease := true
+ defer func() {
+ if !segmentsNeedRelease {
+ return
+ }
+ for i := range segments {
+ segments[i].DecRef()
+ }
+ }()
if m.schema.IndexMode {
+ segmentsNeedRelease = false
return m.buildIndexQueryResult(ctx, mqo, segments)
}
- if len(mqo.Entities) < 1 {
- return nil, errors.New("invalid query options: series is
required")
+ if validateErr := validateMeasureEntities(mqo); validateErr != nil {
+ return nil, validateErr
}
series := make([]*pbv1.Series, len(mqo.Entities))
@@ -125,17 +146,20 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
return nil, err
}
if len(sids) < 1 {
- for i := range segments {
- segments[i].DecRef()
- }
return nilResult, nil
}
result := queryResult{
ctx: ctx,
+ qm: m.queryMetrics,
segments: segments,
tagProjection: mqo.TagProjection,
storedIndexValue: storedIndexValue,
}
+ queryResultTracker.Acquire(&result)
+ if m.queryMetrics != nil {
+ m.queryMetrics.totalQueryResultStarted.Inc(1)
+ }
+ segmentsNeedRelease = false
defer func() {
if err != nil {
result.Release()
@@ -183,31 +207,53 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
return nil, err
}
+ applyMeasureQueryOrdering(mqo, &result)
+ applyTopNOptions(mqo, &result)
+
+ if m.queryMetrics != nil {
+ m.queryMetrics.resultPoints.Observe(float64(len(result.data)))
+ }
+
+ return &result, nil
+}
+
+func validateMeasureQueryOptions(mqo model.MeasureQueryOptions) error {
+ if mqo.TimeRange == nil {
+ return errors.New("invalid query options: timeRange are
required")
+ }
+ if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
+ return errors.New("invalid query options: tagProjection or
fieldProjection is required")
+ }
+ return nil
+}
+
+func validateMeasureEntities(mqo model.MeasureQueryOptions) error {
+ if len(mqo.Entities) < 1 {
+ return errors.New("invalid query options: series is required")
+ }
+ return nil
+}
+
+func applyMeasureQueryOrdering(mqo model.MeasureQueryOptions, result
*queryResult) {
if mqo.Order == nil {
result.ascTS = true
result.orderByTS = true
- } else {
- if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
- result.ascTS = true
- }
- switch mqo.Order.Type {
- case index.OrderByTypeTime:
- result.orderByTS = true
- case index.OrderByTypeIndex:
- result.orderByTS = false
- case index.OrderByTypeSeries:
- result.orderByTS = false
- }
+ return
}
-
- if mqo.Name == TopNSchemaName {
- result.topNQueryOptions = &topNQueryOptions{
- sortDirection: mqo.Sort,
- number: mqo.Number,
- }
+ if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
+ result.ascTS = true
}
+ result.orderByTS = mqo.Order.Type == index.OrderByTypeTime
+}
- return &result, nil
+func applyTopNOptions(mqo model.MeasureQueryOptions, result *queryResult) {
+ if mqo.Name != TopNSchemaName {
+ return
+ }
+ result.topNQueryOptions = &topNQueryOptions{
+ sortDirection: mqo.Sort,
+ number: mqo.Number,
+ }
}
type tagNameWithType struct {
@@ -694,6 +740,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue
{
type queryResult struct {
ctx context.Context
+ qm *queryMetrics
topNQueryOptions *topNQueryOptions
sidToIndex map[common.SeriesID]int
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
@@ -780,6 +827,10 @@ func (qr *queryResult) Pull() *model.MeasureResult {
}
func (qr *queryResult) Release() {
+ queryResultTracker.Release(qr)
+ if qr.qm != nil {
+ qr.qm.totalQueryResultFinished.Inc(1)
+ }
for i, v := range qr.data {
releaseBlockCursor(v)
qr.data[i] = nil
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index c6efbbda2..4857eee4e 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -33,6 +33,7 @@ import (
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/banyand/trace"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -432,6 +433,7 @@ type measureInternalQueryProcessor struct {
measureService measure.Service
*queryService
*bus.UnImplementedHealthyListener
+ metricSvc observability.MetricsRegistry
}
func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message
bus.Message) (resp bus.Message) {
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 4985f47ea..8d5a25ae9 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/banyand/trace"
@@ -51,7 +52,7 @@ type queryService struct {
// NewService return a new query service.
func NewService(_ context.Context, streamService stream.Service,
measureService measure.Service, traceService trace.Service,
- metaService metadata.Repo, pipeline queue.Server,
+ metaService metadata.Repo, pipeline queue.Server, metricSvc
observability.MetricsRegistry,
) (run.Unit, error) {
svc := &queryService{
metaService: metaService,
@@ -66,6 +67,7 @@ func NewService(_ context.Context, streamService
stream.Service, measureService
svc.imqp = &measureInternalQueryProcessor{
measureService: measureService,
queryService: svc,
+ metricSvc: metricSvc,
}
// stream query processor
svc.sqp = &streamQueryProcessor{
diff --git a/banyand/queue/sub/chunked_sync.go
b/banyand/queue/sub/chunked_sync.go
index 63d6137fa..07fa6eafe 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -128,9 +128,10 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
var sessionID string
defer func() {
if currentSession != nil {
- s.unregisterSession(currentSession.sessionID)
if currentSession.partCtx != nil {
- currentSession.partCtx.Close()
+ if closeErr := currentSession.partCtx.Close();
closeErr != nil {
+
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close session partCtx")
+ }
}
}
}()
@@ -154,6 +155,21 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
sessionID = req.SessionId
if req.GetMetadata() != nil {
+ if currentSession != nil {
+ if currentSession.partCtx != nil {
+ if currentSession.partCtx.Handler !=
nil {
+ if finishErr :=
currentSession.partCtx.Handler.FinishSync(); finishErr != nil {
+
s.updateChunkOrderMetrics("finish_sync_err", currentSession.sessionID)
+
s.log.Error().Err(finishErr).Str("session_id",
currentSession.sessionID).Msg("failed to finish sync for previous session")
+ }
+ if closeErr :=
currentSession.partCtx.Close(); closeErr != nil {
+
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close previous session partCtx")
+ }
+ } else if closeErr :=
currentSession.partCtx.Close(); closeErr != nil {
+
s.log.Error().Err(closeErr).Str("session_id",
currentSession.sessionID).Msg("failed to close previous session partCtx")
+ }
+ }
+ }
currentSession = &syncSession{
sessionID: sessionID,
metadata: req.GetMetadata(),
@@ -161,7 +177,6 @@ func (s *server) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) er
chunksReceived: 0,
partsProgress: make(map[int]*partProgress),
}
- s.registerSession(sessionID, currentSession)
if dl := s.log.Debug(); dl.Enabled() {
dl.Str("session_id", sessionID).
Str("topic", req.GetMetadata().Topic).
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 69da73ffc..7f46ff08a 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -85,7 +85,6 @@ type server struct {
listeners map[bus.Topic][]bus.MessageListener
topicMap map[string]bus.Topic
chunkedSyncHandlers map[bus.Topic]queue.ChunkedSyncHandler
- activeSessions map[string]*syncSession
log *logger.Logger
httpSrv *http.Server
tlsReloader *pkgtls.Reloader
@@ -101,7 +100,6 @@ type server struct {
maxRecvMsgSize run.Bytes
listenersLock sync.RWMutex
routeTableProviderMu sync.RWMutex
- activeSessionsMu sync.Mutex
port uint32
httpPort uint32
maxChunkBufferSize uint32
@@ -121,7 +119,6 @@ func NewServerWithPorts(omr observability.MetricsRegistry,
flagNamePrefix string
listeners: make(map[bus.Topic][]bus.MessageListener),
topicMap: make(map[string]bus.Topic),
chunkedSyncHandlers:
make(map[bus.Topic]queue.ChunkedSyncHandler),
- activeSessions: make(map[string]*syncSession),
omr: omr,
maxRecvMsgSize: defaultRecvSize,
flagNamePrefix: flagNamePrefix,
@@ -381,39 +378,6 @@ func (s *server) GracefulStop() {
t.Stop()
s.log.Info().Msg("stopped gracefully")
}
-
- s.closeAllSessions()
-}
-
-// registerSession adds a session to the active sessions map.
-func (s *server) registerSession(id string, session *syncSession) {
- s.activeSessionsMu.Lock()
- s.activeSessions[id] = session
- s.activeSessionsMu.Unlock()
-}
-
-// unregisterSession removes a session from the active sessions map.
-func (s *server) unregisterSession(id string) {
- s.activeSessionsMu.Lock()
- delete(s.activeSessions, id)
- s.activeSessionsMu.Unlock()
-}
-
-// closeAllSessions closes the partCtx of every remaining active session.
-// It is called after the gRPC server has fully stopped as a safety net.
-func (s *server) closeAllSessions() {
- s.activeSessionsMu.Lock()
- sessions := s.activeSessions
- s.activeSessions = make(map[string]*syncSession)
- s.activeSessionsMu.Unlock()
-
- for id, session := range sessions {
- if session.partCtx != nil {
- if closeErr := session.partCtx.Close(); closeErr != nil
{
- s.log.Error().Err(closeErr).Str("session_id",
id).Msg("failed to close session partCtx during shutdown")
- }
- }
- }
}
// RegisterChunkedSyncHandler implements queue.Server.
@@ -444,6 +408,7 @@ type metrics struct {
bufferTimeouts meter.Counter
largeGapsRejected meter.Counter
bufferCapacityExceeded meter.Counter
+ finishSyncErr meter.Counter
}
func newMetrics(factory observability.Factory) *metrics {
@@ -463,11 +428,12 @@ func newMetrics(factory observability.Factory) *metrics {
bufferTimeouts: factory.NewCounter("buffer_timeouts",
"session_id"),
largeGapsRejected:
factory.NewCounter("large_gaps_rejected", "session_id"),
bufferCapacityExceeded:
factory.NewCounter("buffer_capacity_exceeded", "session_id"),
+ finishSyncErr: factory.NewCounter("finish_sync_err",
"session_id"),
}
}
// updateChunkOrderMetrics updates chunk ordering metrics.
-func (s *server) updateChunkOrderMetrics(event string, sessionID string) {
+func (s *server) updateChunkOrderMetrics(event, sessionID string) {
if s.metrics == nil {
return // Skip metrics if not initialized (e.g., during tests)
}
@@ -482,5 +448,7 @@ func (s *server) updateChunkOrderMetrics(event string,
sessionID string) {
s.metrics.largeGapsRejected.Inc(1, sessionID)
case "buffer_full":
s.metrics.bufferCapacityExceeded.Inc(1, sessionID)
+ case "finish_sync_err":
+ s.metrics.finishSyncErr.Inc(1, sessionID)
}
}
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index dfa8e5472..bbcccbaf7 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -35,11 +35,14 @@ import (
itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
logicalstream
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+var streamQueryResultTracker = pool.RegisterTracker("stream.queryResult")
+
const checkDoneEvery = 128
func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr
model.StreamQueryResult, err error) {
@@ -60,9 +63,13 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
return bypassQueryResultInstance, nil
}
+ segmentsNeedRelease := true
defer func() {
- if err != nil {
- sqr.Release()
+ if !segmentsNeedRelease {
+ return
+ }
+ for i := range segments {
+ segments[i].DecRef()
}
}()
@@ -82,10 +89,17 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true,
true)
if sqo.Order == nil || sqo.Order.Index == nil {
- return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil
+ sqr = s.executeTimeSeriesQuery(segments, series, qo, &tr)
+ segmentsNeedRelease = false
+ return sqr, nil
}
- return s.executeIndexedQuery(ctx, segments, series, sqo,
schemaTagTypes, &tr)
+ sqr, err = s.executeIndexedQuery(ctx, segments, series, sqo,
schemaTagTypes, &tr)
+ if err != nil {
+ return nil, err
+ }
+ segmentsNeedRelease = false
+ return sqr, nil
}
func validateQueryInput(sqo model.StreamQueryOptions) error {
@@ -157,6 +171,7 @@ func (s *stream) executeTimeSeriesQuery(
result.asc = true
}
+ streamQueryResultTracker.Acquire(result)
return result
}
@@ -198,6 +213,7 @@ func (s *stream) executeIndexedQuery(
result.asc = true
}
+ streamQueryResultTracker.Acquire(&result)
return &result, nil
}
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index 242a76e9d..09e96316d 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -277,6 +277,7 @@ func (qr *idxResult) releaseBlockCursor() {
}
func (qr *idxResult) Release() {
+ streamQueryResultTracker.Release(qr)
qr.releaseParts()
for i := range qr.segments {
qr.segments[i].DecRef()
diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go
index 5674cd1cf..7c58e6fd8 100644
--- a/banyand/stream/query_by_ts.go
+++ b/banyand/stream/query_by_ts.go
@@ -192,6 +192,7 @@ func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo
queryOptions, is index
}
func (t *tsResult) Release() {
+ streamQueryResultTracker.Release(t)
if t.ts != nil {
t.ts.close()
}
diff --git a/banyand/stream/stream_suite_test.go
b/banyand/stream/stream_suite_test.go
index 0ca524649..853bb5578 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -82,7 +82,7 @@ func setUp() (*services, func()) {
streamService, err := stream.NewService(metadataService, pipeline,
metricSvc, pm, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
preloadStreamSvc := &preloadStreamService{metaSvc: metadataService}
- querySvc, err := query.NewService(context.TODO(), streamService, nil,
nil, metadataService, pipeline)
+ querySvc, err := query.NewService(context.TODO(), streamService, nil,
nil, metadataService, pipeline, metricSvc)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 5fdbb6d55..801b70872 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -34,9 +34,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
+var traceQueryResultTracker = pool.RegisterTracker("trace.queryResult")
+
const (
checkDoneEvery = 128
queryTimeout = 20 * time.Second
@@ -69,12 +72,22 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
if len(segments) < 1 {
return nilResult, nil
}
+ segmentsNeedRelease := true
+ defer func() {
+ if !segmentsNeedRelease {
+ return
+ }
+ for i := range segments {
+ segments[i].DecRef()
+ }
+ }()
result := queryResult{
ctx: ctx,
segments: segments,
tagProjection: tqo.TagProjection,
}
+ segmentsNeedRelease = false
defer func() {
if err != nil {
result.Release()
@@ -105,6 +118,7 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
sidxInstances, sidxQueryRequest, useSIDXStreaming :=
t.prepareSIDXStreaming(tqo, qo, tables)
if len(qo.traceIDs) == 0 && !useSIDXStreaming {
+ result.Release()
return nilResult, nil
}
@@ -131,6 +145,7 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
result.cursorBatchCh = t.startBlockScanStage(pipelineCtx, tables, qo,
traceBatchCh)
+ traceQueryResultTracker.Acquire(&result)
return &result, nil
}
@@ -484,6 +499,7 @@ func (qr *queryResult) releaseCurrentBatch() {
}
func (qr *queryResult) Release() {
+ traceQueryResultTracker.Release(qr)
if qr.cancel != nil {
qr.cancel()
}
@@ -494,23 +510,25 @@ func (qr *queryResult) Release() {
}
// Drain all batches and their cursor channels to ensure
scanTraceIDsInline completes
- for batch := range qr.cursorBatchCh {
- if batch != nil {
- if batch.cursorCh != nil {
- // Drain the cursor channel to ensure
scanTraceIDsInline goroutine finishes
- for result := range batch.cursorCh {
- if result.cursor != nil {
-
releaseBlockCursor(result.cursor)
+ if qr.cursorBatchCh != nil {
+ for batch := range qr.cursorBatchCh {
+ if batch != nil {
+ if batch.cursorCh != nil {
+ // Drain the cursor channel to ensure
scanTraceIDsInline goroutine finishes
+ for result := range batch.cursorCh {
+ if result.cursor != nil {
+
releaseBlockCursor(result.cursor)
+ }
}
}
- }
- // Release snapshots from drained batches
- for _, s := range batch.snapshots {
- s.decRef()
+ // Release snapshots from drained batches
+ for _, s := range batch.snapshots {
+ s.decRef()
+ }
}
}
+ qr.cursorBatchCh = nil
}
- qr.cursorBatchCh = nil
qr.releaseCurrentBatch()
diff --git a/banyand/trace/trace_suite_test.go
b/banyand/trace/trace_suite_test.go
index 071049b20..daefee2bd 100644
--- a/banyand/trace/trace_suite_test.go
+++ b/banyand/trace/trace_suite_test.go
@@ -82,7 +82,7 @@ func setUp() (*services, func()) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
preloadTraceSvc := &preloadTraceService{metaSvc: metadataService}
// Init Query Service for trace queries
- querySvc, err := query.NewService(context.TODO(), nil, nil,
traceService, metadataService, pipeline)
+ querySvc, err := query.NewService(context.TODO(), nil, nil,
traceService, metadataService, pipeline, metricSvc)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 243e904f0..04f27fadd 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -76,7 +76,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace service")
}
- q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc,
metaSvc, pipeline)
+ q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc,
metaSvc, pipeline, metricSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 7ebdfab10..d8fdd4afc 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -73,7 +73,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
}
- q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc,
metaSvc, dataPipeline)
+ q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc,
metaSvc, dataPipeline, metricSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go
index c2b275fd8..d928032ba 100644
--- a/pkg/pool/pool.go
+++ b/pkg/pool/pool.go
@@ -30,11 +30,6 @@ var (
stackTrackingEnabled atomic.Bool
)
-// EnableStackTracking enables or disables stack tracking for all pools.
-func EnableStackTracking(enabled bool) {
- stackTrackingEnabled.Store(enabled)
-}
-
// Register registers a new pool with the given name.
func Register[T any](name string) *Synced[T] {
p := new(Synced[T])
@@ -44,31 +39,6 @@ func Register[T any](name string) *Synced[T] {
return p
}
-// AllRefsCount returns the reference count of all pools.
-func AllRefsCount() map[string]int {
- result := make(map[string]int)
- poolMap.Range(func(key, value any) bool {
- result[key.(string)] = value.(Trackable).RefsCount()
- return true
- })
- return result
-}
-
-// AllStacks returns all recorded stack traces for leaked objects from all
pools.
-func AllStacks() map[string][]string {
- result := make(map[string][]string)
- poolMap.Range(func(key, value any) bool {
- if st, ok := value.(StackTracker); ok {
- stacks := st.Stacks()
- if len(stacks) > 0 {
- result[key.(string)] = stacks
- }
- }
- return true
- })
- return result
-}
-
// Trackable is the interface that wraps the RefsCount method.
type Trackable interface {
// RefsCount returns the reference count of the pool.
diff --git a/pkg/pool/pool_debug.go b/pkg/pool/pool_debug.go
new file mode 100644
index 000000000..dd62b02b2
--- /dev/null
+++ b/pkg/pool/pool_debug.go
@@ -0,0 +1,50 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build !slim
+
+package pool
+
+// EnableStackTracking enables or disables stack tracking for all pools.
+func EnableStackTracking(enabled bool) {
+ stackTrackingEnabled.Store(enabled)
+}
+
+// AllRefsCount returns the reference count of all pools.
+func AllRefsCount() map[string]int {
+ result := make(map[string]int)
+ poolMap.Range(func(key, value any) bool {
+ result[key.(string)] = value.(Trackable).RefsCount()
+ return true
+ })
+ return result
+}
+
+// AllStacks returns all recorded stack traces for leaked objects from all
pools.
+func AllStacks() map[string][]string {
+ result := make(map[string][]string)
+ poolMap.Range(func(key, value any) bool {
+ if st, ok := value.(StackTracker); ok {
+ stacks := st.Stacks()
+ if len(stacks) > 0 {
+ result[key.(string)] = stacks
+ }
+ }
+ return true
+ })
+ return result
+}
diff --git a/pkg/pool/pool_nop.go b/pkg/pool/pool_nop.go
new file mode 100644
index 000000000..6aeb1bd62
--- /dev/null
+++ b/pkg/pool/pool_nop.go
@@ -0,0 +1,29 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build slim
+
+package pool
+
+// EnableStackTracking is a no-op in slim builds.
+func EnableStackTracking(enabled bool) {}
+
+// AllRefsCount returns nil in slim builds.
+func AllRefsCount() map[string]int { return nil }
+
+// AllStacks returns nil in slim builds.
+func AllStacks() map[string][]string { return nil }
diff --git a/pkg/pool/tracker.go b/pkg/pool/tracker.go
new file mode 100644
index 000000000..5a31f1ea4
--- /dev/null
+++ b/pkg/pool/tracker.go
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build !slim
+
+package pool
+
+import (
+ "fmt"
+ "runtime"
+ "sync"
+ "sync/atomic"
+)
+
+// Tracker tracks object lifecycle (Acquire/Release) without pooling.
+// It integrates with AllRefsCount/AllStacks to help detect leaked resources.
+type Tracker struct {
+ stacks map[uint64]string
+ idMap map[any]uint64
+ active sync.Map
+ idCounter atomic.Uint64
+ stacksLock sync.Mutex
+ refs atomic.Int32
+}
+
+// RegisterTracker registers a new lifecycle tracker with the given name.
+func RegisterTracker(name string) *Tracker {
+ t := new(Tracker)
+ if _, ok := poolMap.LoadOrStore(name, t); ok {
+ panic(fmt.Sprintf("duplicated tracker: %s", name))
+ }
+ return t
+}
+
+// Acquire marks an object as acquired.
+func (t *Tracker) Acquire(obj any) {
+ if obj == nil {
+ return
+ }
+ if _, loaded := t.active.LoadOrStore(obj, struct{}{}); loaded {
+ // Double acquire is a logic bug, but avoid inflating refs.
+ return
+ }
+ t.refs.Add(1)
+
+ if !stackTrackingEnabled.Load() {
+ return
+ }
+
+ t.stacksLock.Lock()
+ if t.stacks == nil {
+ t.stacks = make(map[uint64]string)
+ t.idMap = make(map[any]uint64)
+ }
+ id := t.idCounter.Add(1)
+ buf := make([]byte, 4096)
+ n := runtime.Stack(buf, false)
+ t.idMap[obj] = id
+ t.stacks[id] = "Tracker.Acquire() called:\n" + string(buf[:n])
+ t.stacksLock.Unlock()
+}
+
+// Release marks an object as released.
+func (t *Tracker) Release(obj any) {
+ if obj == nil {
+ return
+ }
+ if _, loaded := t.active.LoadAndDelete(obj); !loaded {
+ // Avoid negative refs on double-release or releasing an
untracked object.
+ return
+ }
+ t.refs.Add(-1)
+
+ if !stackTrackingEnabled.Load() {
+ return
+ }
+
+ t.stacksLock.Lock()
+ if t.idMap != nil {
+ if id, ok := t.idMap[obj]; ok {
+ delete(t.stacks, id)
+ delete(t.idMap, obj)
+ }
+ }
+ t.stacksLock.Unlock()
+}
+
+// RefsCount returns the reference count of tracked objects.
+func (t *Tracker) RefsCount() int {
+ return int(t.refs.Load())
+}
+
+// Stacks returns recorded stack traces for acquired-but-not-released objects.
+func (t *Tracker) Stacks() []string {
+ t.stacksLock.Lock()
+ defer t.stacksLock.Unlock()
+
+ if t.stacks == nil {
+ return nil
+ }
+
+ result := make([]string, 0, len(t.stacks))
+ for _, stack := range t.stacks {
+ result = append(result, stack)
+ }
+ return result
+}
diff --git a/pkg/pool/tracker_stub.go b/pkg/pool/tracker_stub.go
new file mode 100644
index 000000000..6ba94dda4
--- /dev/null
+++ b/pkg/pool/tracker_stub.go
@@ -0,0 +1,40 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build slim
+
+package pool
+
+// Tracker is a no-op stub in slim builds.
+type Tracker struct{}
+
+// Acquire is a no-op in slim builds.
+func (t *Tracker) Acquire(any) {}
+
+// Release is a no-op in slim builds.
+func (t *Tracker) Release(any) {}
+
+// RefsCount returns 0 in slim builds.
+func (t *Tracker) RefsCount() int { return 0 }
+
+// Stacks returns nil in slim builds.
+func (t *Tracker) Stacks() []string { return nil }
+
+// RegisterTracker returns a no-op tracker.
+func RegisterTracker(name string) *Tracker {
+ return new(Tracker)
+}
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index 69fce0659..d229d99e6 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -429,6 +429,7 @@ type sortedMIterator struct {
uniqueData map[uint64]*measurev1.InternalDataPoint
cur *measurev1.InternalDataPoint
initialized bool
+ exhausted bool
closed bool
}
@@ -438,7 +439,7 @@ func (s *sortedMIterator) init() {
}
s.initialized = true
if !s.Iterator.Next() {
- s.closed = true
+ s.exhausted = true
return
}
s.data = list.New()
@@ -463,7 +464,7 @@ func (s *sortedMIterator) Next() bool {
}
func (s *sortedMIterator) loadDps() {
- if s.closed {
+ if s.exhausted {
return
}
for k := range s.uniqueData {
@@ -473,7 +474,7 @@ func (s *sortedMIterator) loadDps() {
s.uniqueData[hashDataPoint(first.GetDataPoint())] =
first.InternalDataPoint
for {
if !s.Iterator.Next() {
- s.closed = true
+ s.exhausted = true
break
}
v := s.Iterator.Val()
@@ -500,7 +501,18 @@ func (s *sortedMIterator) Current()
[]*measurev1.InternalDataPoint {
}
func (s *sortedMIterator) Close() error {
- return nil
+ if s.closed {
+ return nil
+ }
+ s.closed = true
+ s.exhausted = true
+ s.data = nil
+ s.uniqueData = nil
+ s.cur = nil
+ if s.Iterator == nil {
+ return nil
+ }
+ return s.Iterator.Close()
}
const (