This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 722a2565b Implement entire group deletion (#1005)
722a2565b is described below

commit 722a2565bd3a5ee9a4707a08fe8f250f47c36120
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Mar 13 19:32:43 2026 +0800

    Implement entire group deletion (#1005)
    
    * Implement entire group deletion
    
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/data/data.go                                   |  21 +
 api/data/measure.go                                |   3 +
 api/data/stream.go                                 |   3 +
 api/data/trace.go                                  |   3 +
 api/proto/banyandb/database/v1/rpc.proto           |  11 +-
 banyand/internal/storage/storage.go                |   2 +
 banyand/internal/storage/tsdb.go                   |  14 +
 banyand/internal/wqueue/wqueue.go                  |  14 +
 banyand/liaison/grpc/deletion.go                   | 465 +++++++++++++++++++++
 banyand/liaison/grpc/deletion_test.go              | 275 ++++++++++++
 banyand/liaison/grpc/discovery.go                  |  63 ++-
 banyand/liaison/grpc/discovery_test.go             | 114 +++++
 banyand/liaison/grpc/measure.go                    |  28 ++
 banyand/liaison/grpc/property.go                   |  25 +-
 banyand/liaison/grpc/registry.go                   |  66 ++-
 banyand/liaison/grpc/server.go                     |  11 +-
 banyand/liaison/grpc/stream.go                     |  19 +
 banyand/liaison/grpc/trace.go                      |  19 +
 banyand/measure/svc_data.go                        |  23 +
 banyand/measure/svc_liaison.go                     |  24 ++
 banyand/measure/svc_standalone.go                  |   5 +
 banyand/metadata/client.go                         |   8 +
 banyand/metadata/metadata.go                       |   2 +
 banyand/metadata/schema/collector.go               |  82 ++++
 banyand/property/db/db.go                          |  28 ++
 banyand/property/service.go                        |  37 +-
 banyand/stream/svc_liaison.go                      |  24 ++
 banyand/stream/svc_standalone.go                   |  24 ++
 banyand/trace/handoff_controller.go                |  46 ++
 banyand/trace/metadata.go                          |  17 +-
 banyand/trace/svc_liaison.go                       |  24 ++
 banyand/trace/svc_standalone.go                    |  25 ++
 bydbctl/internal/cmd/group_test.go                 |  15 +-
 docs/api-reference.md                              |   5 +-
 pkg/index/inverted/inverted_series.go              |   6 +
 pkg/schema/cache.go                                |  23 +-
 pkg/schema/schema.go                               |   2 +
 pkg/test/setup/setup.go                            |  37 +-
 .../distributed/deletion/deletion_suite_test.go    | 313 ++++++++++++++
 .../distributed/{inspect => inspection}/common.go  |   4 +-
 .../{inspect => inspection}/etcd/suite_test.go     |   6 +-
 .../{inspect => inspection}/property/suite_test.go |   6 +-
 .../standalone/deletion/deletion_suite_test.go     | 225 ++++++++++
 .../standalone/{inspect => inspection}/common.go   |   4 +-
 .../{inspect => inspection}/etcd/suite_test.go     |   6 +-
 .../{inspect => inspection}/property/suite_test.go |   6 +-
 46 files changed, 2118 insertions(+), 65 deletions(-)

diff --git a/api/data/data.go b/api/data/data.go
index 9857bac49..b8fb4ef68 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -62,6 +62,9 @@ var (
                TopicStreamCollectLiaisonInfo.String():  
TopicStreamCollectLiaisonInfo,
                TopicTraceCollectDataInfo.String():      
TopicTraceCollectDataInfo,
                TopicTraceCollectLiaisonInfo.String():   
TopicTraceCollectLiaisonInfo,
+               TopicMeasureDropGroup.String():          TopicMeasureDropGroup,
+               TopicStreamDropGroup.String():           TopicStreamDropGroup,
+               TopicTraceDropGroup.String():            TopicTraceDropGroup,
        }
 
        // TopicRequestMap is the map of topic name to request message.
@@ -157,6 +160,15 @@ var (
                TopicTraceCollectLiaisonInfo: func() proto.Message {
                        return &databasev1.GroupRegistryServiceInspectRequest{}
                },
+               TopicMeasureDropGroup: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceDeleteRequest{}
+               },
+               TopicStreamDropGroup: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceDeleteRequest{}
+               },
+               TopicTraceDropGroup: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceDeleteRequest{}
+               },
        }
 
        // TopicResponseMap is the map of topic name to response message.
@@ -207,6 +219,15 @@ var (
                TopicTraceCollectLiaisonInfo: func() proto.Message {
                        return &databasev1.LiaisonInfo{}
                },
+               TopicMeasureDropGroup: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceDeleteRequest{}
+               },
+               TopicStreamDropGroup: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceDeleteRequest{}
+               },
+               TopicTraceDropGroup: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceDeleteRequest{}
+               },
        }
 
        // TopicCommon is the common topic for data transmission.
diff --git a/api/data/measure.go b/api/data/measure.go
index d414abeb9..a6bdcbdc8 100644
--- a/api/data/measure.go
+++ b/api/data/measure.go
@@ -109,3 +109,6 @@ var TopicMeasureCollectDataInfo = 
bus.BiTopic("measure-collect-data-info")
 
 // TopicMeasureCollectLiaisonInfo is the topic for collecting liaison info 
from liaison nodes.
 var TopicMeasureCollectLiaisonInfo = 
bus.BiTopic("measure-collect-liaison-info")
+
+// TopicMeasureDropGroup is the topic for dropping group data files.
+var TopicMeasureDropGroup = bus.BiTopic("measure-drop-group")
diff --git a/api/data/stream.go b/api/data/stream.go
index 560931a39..2130f328e 100644
--- a/api/data/stream.go
+++ b/api/data/stream.go
@@ -99,3 +99,6 @@ var TopicStreamCollectDataInfo = 
bus.BiTopic("stream-collect-data-info")
 
 // TopicStreamCollectLiaisonInfo is the topic for collecting liaison info from 
liaison nodes.
 var TopicStreamCollectLiaisonInfo = bus.BiTopic("stream-collect-liaison-info")
+
+// TopicStreamDropGroup is the topic for dropping group data files.
+var TopicStreamDropGroup = bus.BiTopic("stream-drop-group")
diff --git a/api/data/trace.go b/api/data/trace.go
index 0c7d55795..7bf8edf5a 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -81,3 +81,6 @@ var TopicTraceCollectDataInfo = 
bus.BiTopic("trace-collect-data-info")
 
 // TopicTraceCollectLiaisonInfo is the topic for collecting liaison info from 
liaison nodes.
 var TopicTraceCollectLiaisonInfo = bus.BiTopic("trace-collect-liaison-info")
+
+// TopicTraceDropGroup is the topic for dropping group data files.
+var TopicTraceDropGroup = bus.BiTopic("trace-drop-group")
diff --git a/api/proto/banyandb/database/v1/rpc.proto 
b/api/proto/banyandb/database/v1/rpc.proto
index c0893c065..d9a1cda92 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -364,14 +364,15 @@ message GroupRegistryServiceDeleteRequest {
   // force indicates whether to force delete the group even if it contains 
data.
   // When false, deletion will fail if the group is not empty.
   bool force = 3;
+  // data_only indicates whether to delete only data files without removing 
metadata.
+  // When true, metadata are preserved.
+  bool data_only = 4;
 }
 
 // GroupRegistryServiceDeleteResponse is the response for deleting a group.
 message GroupRegistryServiceDeleteResponse {
-  // deleted indicates whether the group was deleted.
-  bool deleted = 1;
-  // task_id is the ID of the background deletion task.
-  string task_id = 2;
+  // schema_info contains the schema resources that would be deleted 
(populated in dry-run mode).
+  SchemaInfo schema_info = 1;
 }
 
 // GroupDeletionTask represents the status of a group deletion operation.
@@ -402,6 +403,8 @@ message GroupDeletionTask {
   string message = 6;
   // created_at is the timestamp when the task was created.
   google.protobuf.Timestamp created_at = 7;
+  // updated_at is the timestamp when the task was last updated.
+  google.protobuf.Timestamp updated_at = 8;
 }
 
 // GroupRegistryServiceQueryRequest is the request for querying a group 
deletion task.
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 7ff809a95..3b30bef84 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -121,6 +121,8 @@ type TSDB[T TSTable, O any] interface {
        // DeleteOldestSegment deletes exactly one oldest segment if it exists 
and meets safety rules.
        // Returns true if a segment was deleted, false otherwise.
        DeleteOldestSegment() (bool, error)
+       // Drop closes the database and removes all data files from disk.
+       Drop() error
 }
 
 // Segment is a time range of data.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 244774a0d..6cc5c5369 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -175,6 +175,20 @@ func (d *database[T, O]) Close() error {
        return nil
 }
 
+// Drop closes the database and removes all data files from disk.
+func (d *database[T, O]) Drop() (err error) {
+       if closeErr := d.Close(); closeErr != nil {
+               return closeErr
+       }
+       defer func() {
+               if r := recover(); r != nil {
+                       err = errors.Errorf("failed to remove database 
directory %s: %v", d.location, r)
+               }
+       }()
+       d.lfs.MustRMAll(d.location)
+       return nil
+}
+
 // OpenTSDB returns a new tsdb runtime. This constructor will create a new 
database if it's absent,
 // or load an existing one.
 func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O], 
cache Cache, group string) (TSDB[T, O], error) {
diff --git a/banyand/internal/wqueue/wqueue.go 
b/banyand/internal/wqueue/wqueue.go
index 357d5a952..b24c56059 100644
--- a/banyand/internal/wqueue/wqueue.go
+++ b/banyand/internal/wqueue/wqueue.go
@@ -125,6 +125,20 @@ func (q *Queue[S, O]) Close() error {
        return nil
 }
 
+// Drop closes the queue and removes all data files from disk.
+func (q *Queue[S, O]) Drop() (err error) {
+       if closeErr := q.Close(); closeErr != nil {
+               return closeErr
+       }
+       defer func() {
+               if r := recover(); r != nil {
+                       err = fmt.Errorf("failed to remove queue data at %s: 
%v", q.location, r)
+               }
+       }()
+       q.lfs.MustRMAll(q.location)
+       return nil
+}
+
 // Open creates and initializes a new queue with the given options.
 func Open[S SubQueue, O any](ctx context.Context, opts Opts[S, O], _ string) 
(*Queue[S, O], error) {
        p := common.GetPosition(ctx)
diff --git a/banyand/liaison/grpc/deletion.go b/banyand/liaison/grpc/deletion.go
new file mode 100644
index 000000000..60edfd033
--- /dev/null
+++ b/banyand/liaison/grpc/deletion.go
@@ -0,0 +1,465 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "_deletion_task"
+       taskDataTagName                  = "task_data"
+       taskStaleTimeout                 = 10 * time.Minute
+)
+
+type propertyApplier interface {
+       Apply(ctx context.Context, req *propertyv1.ApplyRequest) 
(*propertyv1.ApplyResponse, error)
+       Query(ctx context.Context, req *propertyv1.QueryRequest) 
(*propertyv1.QueryResponse, error)
+}
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     propertyApplier
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(
+       schemaRegistry metadata.Repo, propServer *propertyServer, gr 
*groupRepo, l *logger.Logger,
+) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if !errors.Is(getGroupErr, schema.ErrGRPCResourceNotFound) {
+                       return fmt.Errorf("failed to get internal deletion task 
group: %w", getGroupErr)
+               }
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if !errors.Is(getPropErr, schema.ErrGRPCResourceNotFound) {
+                       return fmt.Errorf("failed to get internal deletion task 
property schema: %w", getPropErr)
+               }
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string, dataOnly bool) error {
+       existingTask, getTaskErr := m.getDeletionTask(ctx, group)
+       if getTaskErr == nil {
+               switch existingTask.CurrentPhase {
+               case databasev1.GroupDeletionTask_PHASE_COMPLETED:
+                       _, getGroupErr := 
m.schemaRegistry.GroupRegistry().GetGroup(ctx, group)
+                       if getGroupErr != nil {
+                               if errors.Is(getGroupErr, 
schema.ErrGRPCResourceNotFound) {
+                                       return fmt.Errorf("group %s has already 
been deleted", group)
+                               }
+                               return fmt.Errorf("failed to check group 
existence: %w", getGroupErr)
+                       }
+                       m.tasks.Delete(group)
+               case databasev1.GroupDeletionTask_PHASE_FAILED:
+                       if _, loaded := m.tasks.LoadOrStore(group, 
existingTask); loaded {
+                               return fmt.Errorf("deletion task for group %s 
is already in progress", group)
+                       }
+                       existingTask.CurrentPhase = 
databasev1.GroupDeletionTask_PHASE_PENDING
+                       existingTask.Message = "retrying after previous failure"
+                       go m.executeDeletion(context.WithoutCancel(ctx), group, 
existingTask, dataOnly)
+                       return nil
+               case databasev1.GroupDeletionTask_PHASE_PENDING,
+                       databasev1.GroupDeletionTask_PHASE_IN_PROGRESS:
+                       if existingTask.GetUpdatedAt() != nil &&
+                               
time.Since(existingTask.GetUpdatedAt().AsTime()) < taskStaleTimeout {
+                               return fmt.Errorf("deletion task for group %s 
is already in progress (last updated %s ago)",
+                                       group, 
time.Since(existingTask.GetUpdatedAt().AsTime()).Truncate(time.Second))
+                       }
+                       if _, loaded := m.tasks.LoadOrStore(group, 
existingTask); loaded {
+                               return fmt.Errorf("deletion task for group %s 
is already in progress", group)
+                       }
+                       existingTask.CurrentPhase = 
databasev1.GroupDeletionTask_PHASE_PENDING
+                       existingTask.Message = "retrying after stale task 
detected"
+                       go m.executeDeletion(context.WithoutCancel(ctx), group, 
existingTask, dataOnly)
+                       return nil
+               }
+       }
+
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       if _, loaded := m.tasks.LoadOrStore(group, task); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(context.WithoutCancel(ctx), group, task, dataOnly)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask, dataOnly bool) {
+       task.Message = "waiting for in-flight requests to complete"
+       m.saveProgress(ctx, group, task)
+       done := m.groupRepo.waitInflightRequests(group)
+       defer m.groupRepo.markDeleted(group)
+       <-done
+
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+       groupMeta, getGroupErr := 
m.schemaRegistry.GroupRegistry().GetGroup(ctx, group)
+       notFound := errors.Is(getGroupErr, schema.ErrGRPCResourceNotFound)
+       if getGroupErr != nil && !notFound {
+               m.failTask(ctx, group, task, fmt.Sprintf("failed to get group 
metadata: %v", getGroupErr))
+               return
+       }
+       if !notFound && groupMeta != nil {
+               task.Message = "deleting data files"
+               m.saveProgress(ctx, group, task)
+               if dropErr := m.schemaRegistry.DropGroup(ctx, 
groupMeta.Catalog, group); dropErr != nil {
+                       m.failTask(ctx, group, task, fmt.Sprintf("failed to 
delete data files: %v", dropErr))
+                       return
+               }
+       }
+
+       if dataOnly {
+               task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+               task.Message = "data files deleted successfully"
+               m.saveProgress(ctx, group, task)
+               return
+       }
+
+       opt := schema.ListOpt{Group: group}
+       type deletionStep struct {
+               fn      func() error
+               message string
+       }
+       steps := []deletionStep{
+               {func() error { return m.deleteIndexRuleBindings(ctx, opt, 
task) }, "deleting index rule bindings"},
+               {func() error { return m.deleteIndexRules(ctx, opt, task) }, 
"deleting index rules"},
+               {func() error { return m.deleteProperties(ctx, opt, task) }, 
"deleting properties"},
+               {func() error { return m.deleteStreams(ctx, opt, task) }, 
"deleting streams"},
+               {func() error { return m.deleteMeasures(ctx, opt, task) }, 
"deleting measures"},
+               {func() error { return m.deleteTraces(ctx, opt, task) }, 
"deleting traces"},
+               {func() error { return m.deleteTopNAggregations(ctx, opt, task) 
}, "deleting topN aggregations"},
+               {func() error { return m.deleteGroup(ctx, group) }, "deleting 
group"},
+       }
+       for _, step := range steps {
+               if stepErr := m.runStep(ctx, group, task, step.message, 
step.fn); stepErr != nil {
+                       return
+               }
+       }
+
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+       task.Message = "group deleted successfully"
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRuleBindings(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       bindings, listErr := 
m.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule_binding"] = int32(len(bindings))
+       for _, binding := range bindings {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, 
binding.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("index rule binding %s: %w", 
binding.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule_binding"] = 
task.TotalCounts["index_rule_binding"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRules(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       indexRules, listErr := 
m.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule"] = int32(len(indexRules))
+       for _, rule := range indexRules {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, rule.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("index rule %s: %w", 
rule.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule"] = task.TotalCounts["index_rule"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteProperties(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       properties, listErr := 
m.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["property"] = int32(len(properties))
+       for _, prop := range properties {
+               if _, deleteErr := 
m.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, prop.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("property %s: %w", 
prop.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["property"] = task.TotalCounts["property"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteStreams(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       streams, listErr := m.schemaRegistry.StreamRegistry().ListStream(ctx, 
opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["stream"] = int32(len(streams))
+       for _, stream := range streams {
+               if _, deleteErr := 
m.schemaRegistry.StreamRegistry().DeleteStream(ctx, stream.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("stream %s: %w", 
stream.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["stream"] = task.TotalCounts["stream"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteMeasures(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       measures, listErr := 
m.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["measure"] = int32(len(measures))
+       for _, measure := range measures {
+               if _, deleteErr := 
m.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, measure.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("measure %s: %w", 
measure.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["measure"] = task.TotalCounts["measure"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTraces(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       traces, listErr := m.schemaRegistry.TraceRegistry().ListTrace(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["trace"] = int32(len(traces))
+       for _, trace := range traces {
+               if _, deleteErr := 
m.schemaRegistry.TraceRegistry().DeleteTrace(ctx, trace.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("trace %s: %w", 
trace.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["trace"] = task.TotalCounts["trace"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTopNAggregations(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       topNAggs, listErr := 
m.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["topn_aggregation"] = int32(len(topNAggs))
+       for _, agg := range topNAggs {
+               if _, deleteErr := 
m.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, 
agg.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("topN aggregation %s: %w", 
agg.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["topn_aggregation"] = 
task.TotalCounts["topn_aggregation"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteGroup(ctx context.Context, group 
string) error {
+       _, deleteErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx, group)
+       if deleteErr != nil && !errors.Is(deleteErr, 
schema.ErrGRPCResourceNotFound) {
+               return fmt.Errorf("failed to delete group %s: %w", group, 
deleteErr)
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) runStep(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask,
+       message string, fn func() error,
+) error {
+       task.Message = message
+       if stepErr := fn(); stepErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("%s failed: %v", 
message, stepErr))
+               return stepErr
+       }
+       m.saveProgress(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) saveProgress(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       task.UpdatedAt = timestamppb.Now()
+       snapshot := proto.Clone(task).(*databasev1.GroupDeletionTask)
+       m.tasks.Store(group, snapshot)
+       if saveErr := m.saveDeletionTask(ctx, group, snapshot); saveErr != nil {
+               m.log.Error().Err(saveErr).Str("group", group).Msg("failed to 
save deletion progress")
+       }
+}
+
+func (m *groupDeletionTaskManager) failTask(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask, 
msg string,
+) {
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_FAILED
+       task.Message = msg
+       m.log.Error().Str("group", group).Msg(msg)
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) saveDeletionTask(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) error {
+       taskData, marshalErr := proto.Marshal(task)
+       if marshalErr != nil {
+               return fmt.Errorf("failed to marshal deletion task: %w", 
marshalErr)
+       }
+       _, applyErr := m.propServer.Apply(ctx, &propertyv1.ApplyRequest{
+               Property: &propertyv1.Property{
+                       Metadata: &commonv1.Metadata{
+                               Group: internalDeletionTaskGroup,
+                               Name:  internalDeletionTaskPropertyName,
+                       },
+                       Id: group,
+                       Tags: []*modelv1.Tag{
+                               {
+                                       Key:   taskDataTagName,
+                                       Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: taskData}},
+                               },
+                       },
+               },
+               Strategy: propertyv1.ApplyRequest_STRATEGY_REPLACE,
+       })
+       if applyErr != nil {
+               return fmt.Errorf("failed to save deletion task property: %w", 
applyErr)
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) getDeletionTask(ctx context.Context, group 
string) (*databasev1.GroupDeletionTask, error) {
+       if v, ok := m.tasks.Load(group); ok {
+               if task, isTask := v.(*databasev1.GroupDeletionTask); isTask {
+                       return 
proto.Clone(task).(*databasev1.GroupDeletionTask), nil
+               }
+       }
+       resp, queryErr := m.propServer.Query(ctx, &propertyv1.QueryRequest{
+               Groups: []string{internalDeletionTaskGroup},
+               Name:   internalDeletionTaskPropertyName,
+               Ids:    []string{group},
+               Limit:  1,
+       })
+       if queryErr != nil {
+               return nil, fmt.Errorf("failed to query deletion task property: 
%w", queryErr)
+       }
+       if len(resp.Properties) == 0 {
+               return nil, fmt.Errorf("deletion task for group %s not found", 
group)
+       }
+       for _, tag := range resp.Properties[0].Tags {
+               if tag.Key == taskDataTagName {
+                       binaryData := tag.Value.GetBinaryData()
+                       if binaryData == nil {
+                               return nil, fmt.Errorf("deletion task for group 
%s has no binary data", group)
+                       }
+                       var task databasev1.GroupDeletionTask
+                       if unmarshalErr := proto.Unmarshal(binaryData, &task); 
unmarshalErr != nil {
+                               return nil, fmt.Errorf("failed to unmarshal 
deletion task: %w", unmarshalErr)
+                       }
+                       return &task, nil
+               }
+       }
+       return nil, fmt.Errorf("deletion task for group %s has no task_data 
tag", group)
+}
+
+func (m *groupDeletionTaskManager) hasNonEmptyResources(ctx context.Context, 
group string) (bool, error) {
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               return false, fmt.Errorf("failed to collect data info: %w", 
dataErr)
+       }
+       for _, di := range dataInfo {
+               if di.GetDataSizeBytes() > 0 {
+                       return true, nil
+               }
+       }
+       return false, nil
+}
diff --git a/banyand/liaison/grpc/deletion_test.go 
b/banyand/liaison/grpc/deletion_test.go
new file mode 100644
index 000000000..88b810a7c
--- /dev/null
+++ b/banyand/liaison/grpc/deletion_test.go
@@ -0,0 +1,275 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "go.uber.org/mock/gomock"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type mockPropertyApplier struct {
+       applyFn func(ctx context.Context, req *propertyv1.ApplyRequest) 
(*propertyv1.ApplyResponse, error)
+       queryFn func(ctx context.Context, req *propertyv1.QueryRequest) 
(*propertyv1.QueryResponse, error)
+}
+
+func (m *mockPropertyApplier) Apply(ctx context.Context, req 
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
+       return m.applyFn(ctx, req)
+}
+
+func (m *mockPropertyApplier) Query(ctx context.Context, req 
*propertyv1.QueryRequest) (*propertyv1.QueryResponse, error) {
+       return m.queryFn(ctx, req)
+}
+
+// stubIndexRuleBinding implements schema.IndexRuleBinding returning empty 
results.
+type stubIndexRuleBinding struct{}
+
+func (s *stubIndexRuleBinding) GetIndexRuleBinding(_ context.Context, _ 
*commonv1.Metadata) (*databasev1.IndexRuleBinding, error) {
+       return nil, nil
+}
+
+func (s *stubIndexRuleBinding) ListIndexRuleBinding(_ context.Context, _ 
schema.ListOpt) ([]*databasev1.IndexRuleBinding, error) {
+       return nil, nil
+}
+
+func (s *stubIndexRuleBinding) CreateIndexRuleBinding(_ context.Context, _ 
*databasev1.IndexRuleBinding) error {
+       return nil
+}
+
+func (s *stubIndexRuleBinding) UpdateIndexRuleBinding(_ context.Context, _ 
*databasev1.IndexRuleBinding) error {
+       return nil
+}
+
+func (s *stubIndexRuleBinding) DeleteIndexRuleBinding(_ context.Context, _ 
*commonv1.Metadata) (bool, error) {
+       return true, nil
+}
+
+// stubIndexRule implements schema.IndexRule returning empty results.
+type stubIndexRule struct{}
+
+func (s *stubIndexRule) GetIndexRule(_ context.Context, _ *commonv1.Metadata) 
(*databasev1.IndexRule, error) {
+       return nil, nil
+}
+
+func (s *stubIndexRule) ListIndexRule(_ context.Context, _ schema.ListOpt) 
([]*databasev1.IndexRule, error) {
+       return nil, nil
+}
+
+func (s *stubIndexRule) CreateIndexRule(_ context.Context, _ 
*databasev1.IndexRule) error {
+       return nil
+}
+
+func (s *stubIndexRule) UpdateIndexRule(_ context.Context, _ 
*databasev1.IndexRule) error {
+       return nil
+}
+
+func (s *stubIndexRule) DeleteIndexRule(_ context.Context, _ 
*commonv1.Metadata) (bool, error) {
+       return true, nil
+}
+
+func TestHasNonEmptyResources(t *testing.T) {
+       tests := []struct {
+               name     string
+               infos    []*databasev1.DataInfo
+               expected bool
+       }{
+               {
+                       name:     "all zero sizes",
+                       infos:    []*databasev1.DataInfo{{DataSizeBytes: 0}, 
{DataSizeBytes: 0}},
+                       expected: false,
+               },
+               {
+                       name:     "one non-zero size",
+                       infos:    []*databasev1.DataInfo{{DataSizeBytes: 0}, 
{DataSizeBytes: 1024}},
+                       expected: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctrl := gomock.NewController(t)
+                       defer ctrl.Finish()
+
+                       mockRepo := metadata.NewMockRepo(ctrl)
+                       mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
"test-group").Return(tt.infos, nil)
+
+                       m := &groupDeletionTaskManager{schemaRegistry: mockRepo}
+                       hasResources, checkErr := 
m.hasNonEmptyResources(context.Background(), "test-group")
+                       require.NoError(t, checkErr)
+                       assert.Equal(t, tt.expected, hasResources)
+               })
+       }
+}
+
+func TestDeletion(t *testing.T) {
+       t.Run("duplicate prevention", func(t *testing.T) {
+               m := &groupDeletionTaskManager{}
+               m.tasks.Store("existing-group", &databasev1.GroupDeletionTask{
+                       CurrentPhase: 
databasev1.GroupDeletionTask_PHASE_IN_PROGRESS,
+                       UpdatedAt:    timestamppb.Now(),
+               })
+
+               err := m.startDeletion(context.Background(), "existing-group", 
false)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "already in progress")
+       })
+
+       t.Run("deletion", func(t *testing.T) {
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               const group = "test-group"
+               gr := &groupRepo{
+                       log:          logger.GetLogger("test"),
+                       resourceOpts: make(map[string]*commonv1.ResourceOpts),
+                       inflight:     make(map[string]*groupInflight),
+               }
+               require.NoError(t, gr.acquireRequest(group))
+
+               var (
+                       mu       sync.Mutex
+                       lastTask = &databasev1.GroupDeletionTask{}
+               )
+               propApplier := &mockPropertyApplier{
+                       applyFn: func(_ context.Context, req 
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
+                               for _, tag := range req.Property.Tags {
+                                       if tag.Key == taskDataTagName {
+                                               var task 
databasev1.GroupDeletionTask
+                                               if unmarshalErr := 
proto.Unmarshal(tag.Value.GetBinaryData(), &task); unmarshalErr == nil {
+                                                       mu.Lock()
+                                                       lastTask = 
proto.Clone(&task).(*databasev1.GroupDeletionTask)
+                                                       mu.Unlock()
+                                               }
+                                       }
+                               }
+                               return &propertyv1.ApplyResponse{}, nil
+                       },
+                       queryFn: func(_ context.Context, _ 
*propertyv1.QueryRequest) (*propertyv1.QueryResponse, error) {
+                               mu.Lock()
+                               cloned := 
proto.Clone(lastTask).(*databasev1.GroupDeletionTask)
+                               mu.Unlock()
+                               taskData, _ := proto.Marshal(cloned)
+                               return &propertyv1.QueryResponse{
+                                       Properties: []*propertyv1.Property{{
+                                               Id: group,
+                                               Tags: []*modelv1.Tag{{
+                                                       Key:   taskDataTagName,
+                                                       Value: 
&modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: taskData}},
+                                               }},
+                                       }},
+                               }, nil
+                       },
+               }
+
+               mockRepo := metadata.NewMockRepo(ctrl)
+               mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil)
+               
mockRepo.EXPECT().IndexRuleBindingRegistry().Return(&stubIndexRuleBinding{})
+               mockRepo.EXPECT().IndexRuleRegistry().Return(&stubIndexRule{})
+
+               mockProperty := schema.NewMockProperty(ctrl)
+               mockProperty.EXPECT().ListProperty(gomock.Any(), 
schema.ListOpt{Group: group}).Return(nil, nil)
+               mockRepo.EXPECT().PropertyRegistry().Return(mockProperty)
+
+               mockStream := schema.NewMockStream(ctrl)
+               mockStream.EXPECT().ListStream(gomock.Any(), 
schema.ListOpt{Group: group}).Return(nil, nil)
+               mockRepo.EXPECT().StreamRegistry().Return(mockStream)
+
+               mockMeasure := schema.NewMockMeasure(ctrl)
+               mockMeasure.EXPECT().ListMeasure(gomock.Any(), 
schema.ListOpt{Group: group}).Return(nil, nil)
+               mockRepo.EXPECT().MeasureRegistry().Return(mockMeasure)
+
+               mockTrace := schema.NewMockTrace(ctrl)
+               mockTrace.EXPECT().ListTrace(gomock.Any(), 
schema.ListOpt{Group: group}).Return(nil, nil)
+               mockRepo.EXPECT().TraceRegistry().Return(mockTrace)
+
+               mockTopN := schema.NewMockTopNAggregation(ctrl)
+               mockTopN.EXPECT().ListTopNAggregation(gomock.Any(), 
schema.ListOpt{Group: group}).Return(nil, nil)
+               mockRepo.EXPECT().TopNAggregationRegistry().Return(mockTopN)
+
+               mockGroup := schema.NewMockGroup(ctrl)
+               mockGroup.EXPECT().GetGroup(gomock.Any(), 
group).Return(&commonv1.Group{
+                       Metadata: &commonv1.Metadata{Name: group},
+                       Catalog:  commonv1.Catalog_CATALOG_STREAM,
+               }, nil)
+               mockRepo.EXPECT().DropGroup(gomock.Any(), 
commonv1.Catalog_CATALOG_STREAM, group).Return(nil)
+               mockGroup.EXPECT().DeleteGroup(gomock.Any(), group).DoAndReturn(
+                       func(_ context.Context, g string) (bool, error) {
+                               go func() {
+                                       time.Sleep(10 * time.Millisecond)
+                                       gr.OnDelete(schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{Kind: 
schema.KindGroup},
+                                               Spec: &commonv1.Group{
+                                                       Metadata:     
&commonv1.Metadata{Name: g},
+                                                       ResourceOpts: 
&commonv1.ResourceOpts{ShardNum: 1},
+                                                       Catalog:      
commonv1.Catalog_CATALOG_STREAM,
+                                               },
+                                       })
+                               }()
+                               return true, nil
+                       },
+               )
+               mockRepo.EXPECT().GroupRegistry().Return(mockGroup).Times(2)
+
+               m := &groupDeletionTaskManager{
+                       schemaRegistry: mockRepo,
+                       propServer:     propApplier,
+                       groupRepo:      gr,
+                       log:            logger.GetLogger("test"),
+               }
+               require.NoError(t, m.startDeletion(context.Background(), group, 
false))
+               require.Eventually(t, func() bool {
+                       acquireErr := gr.acquireRequest(group)
+                       if acquireErr == nil {
+                               gr.releaseRequest(group)
+                               return false
+                       }
+                       return errors.Is(acquireErr, errGroupPendingDeletion)
+               }, 2*time.Second, 10*time.Millisecond)
+
+               pendingTask, queryErr := 
m.getDeletionTask(context.Background(), group)
+               require.NoError(t, queryErr)
+               assert.Equal(t, databasev1.GroupDeletionTask_PHASE_PENDING, 
pendingTask.GetCurrentPhase())
+               assert.Equal(t, int64(512), pendingTask.GetTotalDataSizeBytes())
+
+               gr.releaseRequest(group)
+               require.Eventually(t, func() bool {
+                       statusTask, statusErr := 
m.getDeletionTask(context.Background(), group)
+                       return statusErr == nil && statusTask.GetCurrentPhase() 
== databasev1.GroupDeletionTask_PHASE_COMPLETED
+               }, 5*time.Second, 20*time.Millisecond)
+
+               finalTask, finalErr := m.getDeletionTask(context.Background(), 
group)
+               require.NoError(t, finalErr)
+               assert.Equal(t, databasev1.GroupDeletionTask_PHASE_COMPLETED, 
finalTask.GetCurrentPhase())
+               assert.Equal(t, "group deleted successfully", 
finalTask.GetMessage())
+       })
+}
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index db6953dd5..536e9a484 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -34,7 +34,10 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
-var errNotExist = errors.New("the object doesn't exist")
+var (
+       errNotExist             = errors.New("the object doesn't exist")
+       errGroupPendingDeletion = errors.New("group is pending deletion")
+)
 
 type discoveryService struct {
        metadataRepo    metadata.Repo
@@ -138,13 +141,71 @@ func (i identity) String() string {
 
 var _ schema.EventHandler = (*groupRepo)(nil)
 
+type groupInflight struct {
+       done chan struct{}
+       wg   sync.WaitGroup
+}
+
 type groupRepo struct {
        schema.UnimplementedOnInitHandler
        log          *logger.Logger
        resourceOpts map[string]*commonv1.ResourceOpts
+       inflight     map[string]*groupInflight
        sync.RWMutex
 }
 
+func (s *groupRepo) acquireRequest(groupName string) error {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       item, ok := s.inflight[groupName]
+       if ok && item.done != nil {
+               return fmt.Errorf("%s: %w", groupName, errGroupPendingDeletion)
+       }
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.wg.Add(1)
+       return nil
+}
+
+func (s *groupRepo) releaseRequest(groupName string) {
+       s.RWMutex.RLock()
+       item, ok := s.inflight[groupName]
+       s.RWMutex.RUnlock()
+       if ok {
+               item.wg.Done()
+       }
+}
+
+func (s *groupRepo) waitInflightRequests(groupName string) <-chan struct{} {
+       s.RWMutex.Lock()
+       item, ok := s.inflight[groupName]
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       if item.done != nil {
+               ch := item.done
+               s.RWMutex.Unlock()
+               return ch
+       }
+       ch := make(chan struct{})
+       item.done = ch
+       s.RWMutex.Unlock()
+       go func(wg *sync.WaitGroup, done chan struct{}) {
+               wg.Wait()
+               close(done)
+       }(&item.wg, ch)
+       return ch
+}
+
+func (s *groupRepo) markDeleted(groupName string) {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       delete(s.inflight, groupName)
+}
+
 func (s *groupRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
        if schemaMetadata.Kind != schema.KindGroup {
                return
diff --git a/banyand/liaison/grpc/discovery_test.go 
b/banyand/liaison/grpc/discovery_test.go
new file mode 100644
index 000000000..291b6786d
--- /dev/null
+++ b/banyand/liaison/grpc/discovery_test.go
@@ -0,0 +1,114 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestGroupRepo_AcquireAndRelease(t *testing.T) {
+       gr := &groupRepo{
+               log:          logger.GetLogger("test"),
+               resourceOpts: make(map[string]*commonv1.ResourceOpts),
+               inflight:     make(map[string]*groupInflight),
+       }
+
+       require.NoError(t, gr.acquireRequest("test-group"))
+       require.NoError(t, gr.acquireRequest("test-group"))
+       gr.releaseRequest("test-group")
+       gr.releaseRequest("test-group")
+
+       gr.RWMutex.RLock()
+       item, ok := gr.inflight["test-group"]
+       gr.RWMutex.RUnlock()
+       assert.True(t, ok)
+       assert.Nil(t, item.done)
+}
+
+func TestGroupRepo_AcquireDuringPendingDeletion(t *testing.T) {
+       gr := &groupRepo{
+               log:          logger.GetLogger("test"),
+               resourceOpts: make(map[string]*commonv1.ResourceOpts),
+               inflight:     make(map[string]*groupInflight),
+       }
+
+       _ = gr.waitInflightRequests("del-group")
+       err := gr.acquireRequest("del-group")
+       assert.ErrorIs(t, err, errGroupPendingDeletion)
+}
+
+func TestGroupRepo_WaitInflightRequests(t *testing.T) {
+       tests := []struct {
+               name       string
+               acquireNum int
+       }{
+               {name: "completes after release", acquireNum: 1},
+               {name: "no inflight", acquireNum: 0},
+       }
+       for _, tc := range tests {
+               t.Run(tc.name, func(t *testing.T) {
+                       gr := &groupRepo{
+                               log:          logger.GetLogger("test"),
+                               resourceOpts: 
make(map[string]*commonv1.ResourceOpts),
+                               inflight:     make(map[string]*groupInflight),
+                       }
+                       for range tc.acquireNum {
+                               require.NoError(t, gr.acquireRequest("g"))
+                       }
+                       done := gr.waitInflightRequests("g")
+                       if tc.acquireNum > 0 {
+                               select {
+                               case <-done:
+                                       t.Fatal("done channel should not be 
closed while requests are in flight")
+                               default:
+                               }
+                               for range tc.acquireNum {
+                                       gr.releaseRequest("g")
+                               }
+                       }
+                       select {
+                       case <-done:
+                       case <-time.After(2 * time.Second):
+                               t.Fatal("done channel did not close after all 
requests were released")
+                       }
+               })
+       }
+}
+
+func TestGroupRepo_MarkDeleted(t *testing.T) {
+       gr := &groupRepo{
+               log:          logger.GetLogger("test"),
+               resourceOpts: make(map[string]*commonv1.ResourceOpts),
+               inflight:     make(map[string]*groupInflight),
+       }
+
+       gr.waitInflightRequests("g3")
+       gr.markDeleted("g3")
+
+       gr.RWMutex.RLock()
+       _, ok := gr.inflight["g3"]
+       gr.RWMutex.RUnlock()
+       assert.False(t, ok)
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 125c8aeaa..8076a9c61 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -138,14 +138,22 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
 
                ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"measure", "write")
 
+               if acquireErr := ms.groupRepo.acquireRequest(metadata.Group); 
acquireErr != nil {
+                       ms.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+                       continue
+               }
+
                if status := ms.validateWriteRequest(writeRequest, metadata, 
measure); status != modelv1.Status_STATUS_SUCCEED {
+                       ms.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
 
                if err := ms.processAndPublishRequest(ctx, writeRequest, 
metadata, spec,
                        specEntityLocator, specShardingKeyLocator, publisher, 
&succeedSent, measure, nodeMetadataSent, nodeSpecSent); err != nil {
+                       ms.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
+               ms.groupRepo.releaseRequest(metadata.Group)
        }
 }
 
@@ -358,6 +366,16 @@ func (ms *measureService) handleWriteCleanup(publisher 
queue.BatchPublisher, suc
 var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: 
make([]*measurev1.DataPoint, 0)}
 
 func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range req.Groups {
+                       ms.groupRepo.releaseRequest(g)
+               }
+       }()
        for _, g := range req.Groups {
                ms.metrics.totalStarted.Inc(1, g, "measure", "query")
        }
@@ -424,6 +442,16 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
 }
 
 func (ms *measureService) TopN(ctx context.Context, topNRequest 
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
+       for _, g := range topNRequest.GetGroups() {
+               if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range topNRequest.GetGroups() {
+                       ms.groupRepo.releaseRequest(g)
+               }
+       }()
        start := time.Now()
        defer func() {
                duration := time.Since(start)
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index c62957991..0393d759f 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -28,13 +28,14 @@ import (
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
        "google.golang.org/protobuf/encoding/protojson"
        "google.golang.org/protobuf/types/known/timestamppb"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -148,8 +149,8 @@ func (ps *propertyServer) validatePropertyTags(ctx 
context.Context, property *pr
                found := false
                for _, ts := range propSchema.Tags {
                        if ts.Name == tag.Key {
-                               typ := 
databasev1.TagType(pbv1.MustTagValueToValueType(tag.Value))
-                               if typ != 
databasev1.TagType_TAG_TYPE_UNSPECIFIED && ts.Type != typ {
+                               typ := pbv1.MustTagValueToValueType(tag.Value)
+                               if typ != pbv1.ValueTypeUnknown && 
pbv1.MustTagValueSpecToValueType(ts.Type) != typ {
                                        return errors.Errorf("property %s tag 
%s type mismatch", property.Metadata.Name, tag.Key)
                                }
                                found = true
@@ -191,6 +192,10 @@ func (ps *propertyServer) Apply(ctx context.Context, req 
*propertyv1.ApplyReques
                return nil, err
        }
        g := req.Property.Metadata.Group
+       if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != nil {
+               return nil, status.Errorf(codes.FailedPrecondition, "group %s 
is pending deletion", g)
+       }
+       defer ps.groupRepo.releaseRequest(g)
        ps.metrics.totalStarted.Inc(1, g, "property", "apply")
        start := time.Now()
        defer func() {
@@ -370,6 +375,10 @@ func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequ
                return nil, schema.BadRequest("name", "name should not be nil")
        }
        g := req.Group
+       if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != nil {
+               return nil, status.Errorf(codes.FailedPrecondition, "group %s 
is pending deletion", g)
+       }
+       defer ps.groupRepo.releaseRequest(g)
        ps.metrics.totalStarted.Inc(1, g, "property", "delete")
        start := time.Now()
        defer func() {
@@ -415,6 +424,16 @@ func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequ
 }
 
 func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range req.Groups {
+                       ps.groupRepo.releaseRequest(g)
+               }
+       }()
        ps.metrics.totalStarted.Inc(1, "", "property", "query")
        start := time.Now()
        defer func() {
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 8c0a2dc48..1e9af0c15 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -577,8 +577,9 @@ func (rs *measureRegistryServer) Exist(ctx context.Context,
 
 type groupRegistryServer struct {
        databasev1.UnimplementedGroupRegistryServiceServer
-       schemaRegistry metadata.Repo
-       metrics        *metrics
+       schemaRegistry      metadata.Repo
+       deletionTaskManager *groupDeletionTaskManager
+       metrics             *metrics
 }
 
 func (rs *groupRegistryServer) Create(ctx context.Context, req 
*databasev1.GroupRegistryServiceCreateRequest) (
@@ -618,20 +619,53 @@ func (rs *groupRegistryServer) Update(ctx 
context.Context, req *databasev1.Group
 func (rs *groupRegistryServer) Delete(ctx context.Context, req 
*databasev1.GroupRegistryServiceDeleteRequest) (
        *databasev1.GroupRegistryServiceDeleteResponse, error,
 ) {
-       g := ""
+       g := req.GetGroup()
        rs.metrics.totalRegistryStarted.Inc(1, g, "group", "delete")
        start := time.Now()
        defer func() {
                rs.metrics.totalRegistryFinished.Inc(1, g, "group", "delete")
                
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"delete")
        }()
-       deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
req.GetGroup())
-       if err != nil {
+       if g == internalDeletionTaskGroup {
                rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
-               return nil, err
+               return nil, status.Errorf(codes.PermissionDenied, "cannot 
delete internal system group %s", g)
+       }
+       if _, getErr := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, g); 
getErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+               return nil, getErr
+       }
+       if req.GetDryRun() {
+               return rs.dryRunDelete(ctx, g)
+       }
+       if !req.GetForce() {
+               hasResources, checkErr := 
rs.deletionTaskManager.hasNonEmptyResources(ctx, g)
+               if checkErr != nil {
+                       rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+                       return nil, checkErr
+               }
+               if hasResources {
+                       rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+                       return nil, status.Errorf(codes.FailedPrecondition,
+                               "group %s is not empty, use force=true to 
delete non-empty groups", g)
+               }
+       }
+       if startErr := rs.deletionTaskManager.startDeletion(ctx, g, 
req.GetDataOnly()); startErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+               return nil, startErr
+       }
+       return &databasev1.GroupRegistryServiceDeleteResponse{}, nil
+}
+
+func (rs *groupRegistryServer) dryRunDelete(ctx context.Context, g string) (
+       *databasev1.GroupRegistryServiceDeleteResponse, error,
+) {
+       schemaInfo, schemaErr := rs.collectSchemaInfo(ctx, g)
+       if schemaErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+               return nil, schemaErr
        }
        return &databasev1.GroupRegistryServiceDeleteResponse{
-               Deleted: deleted,
+               SchemaInfo: schemaInfo,
        }, nil
 }
 
@@ -738,10 +772,24 @@ func (rs *groupRegistryServer) Inspect(ctx 
context.Context, req *databasev1.Grou
        }, nil
 }
 
-func (rs *groupRegistryServer) Query(_ context.Context, _ 
*databasev1.GroupRegistryServiceQueryRequest) (
+func (rs *groupRegistryServer) Query(ctx context.Context, req 
*databasev1.GroupRegistryServiceQueryRequest) (
        *databasev1.GroupRegistryServiceQueryResponse, error,
 ) {
-       return nil, status.Error(codes.Unimplemented, "Query method not 
implemented yet")
+       g := req.GetGroup()
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "query")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "query")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"query")
+       }()
+       task, queryErr := rs.deletionTaskManager.getDeletionTask(ctx, g)
+       if queryErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "query")
+               return nil, queryErr
+       }
+       return &databasev1.GroupRegistryServiceQueryResponse{
+               Task: task,
+       }, nil
 }
 
 func (rs *groupRegistryServer) collectSchemaInfo(ctx context.Context, group 
string) (*databasev1.SchemaInfo, error) {
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 5fb4b2c1b..a78ebd004 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -141,7 +141,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
        schemaRegistry metadata.Repo, nr NodeRegistries, omr 
observability.MetricsRegistry,
        protectorService protector.Memory, routeProviders 
map[string]route.TableProvider,
 ) Server {
-       gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)}
+       gr := &groupRepo{
+               resourceOpts: make(map[string]*commonv1.ResourceOpts),
+               inflight:     make(map[string]*groupInflight),
+       }
        er := &entityRepo{entitiesMap: make(map[identity]partition.Locator), 
measureMap: make(map[identity]*databasev1.Measure)}
        streamSVC := &streamService{
                discoveryService: newDiscoveryService(schema.KindStream, 
schemaRegistry, nr.StreamLiaisonNodeRegistry, gr),
@@ -247,6 +250,12 @@ func (s *server) PreRun(ctx context.Context) error {
        s.traceSVC.setLogger(s.log.Named("trace"))
        s.propertyServer.SetLogger(s.log)
        s.bydbQLSVC.setLogger(s.log.Named("bydbql"))
+       s.groupRegistryServer.deletionTaskManager = newGroupDeletionTaskManager(
+               s.groupRegistryServer.schemaRegistry, s.propertyServer, 
s.groupRepo, s.log.Named("group-deletion"),
+       )
+       if initErr := 
s.groupRegistryServer.deletionTaskManager.initPropertyStorage(ctx); initErr != 
nil {
+               return initErr
+       }
        components := []*discoveryService{
                s.streamSVC.discoveryService,
                s.measureSVC.discoveryService,
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index af570cfaa..ba26b3ef4 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -285,7 +285,13 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                requestCount++
                s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"stream", "write")
 
+               if acquireErr := s.groupRepo.acquireRequest(metadata.Group); 
acquireErr != nil {
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
+                       continue
+               }
+
                if s.validateWriteRequest(writeEntity, metadata, stream) != 
modelv1.Status_STATUS_SUCCEED {
+                       s.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
 
@@ -293,6 +299,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("navigation failed")
                        s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
+                       s.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
 
@@ -306,8 +313,10 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("publishing failed")
                        s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
+                       s.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
+               s.groupRepo.releaseRequest(metadata.Group)
 
                succeedSent = append(succeedSent, succeedSentMessage{
                        metadata:  metadata,
@@ -320,6 +329,16 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
 var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: 
make([]*streamv1.Element, 0)}
 
 func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) 
(resp *streamv1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := s.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range req.Groups {
+                       s.groupRepo.releaseRequest(g)
+               }
+       }()
        for _, g := range req.Groups {
                s.metrics.totalStarted.Inc(1, g, "stream", "query")
        }
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 083927c8e..3143d6418 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -374,7 +374,13 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
                requestCount++
                s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"trace", "write")
 
+               if acquireErr := s.groupRepo.acquireRequest(metadata.Group); 
acquireErr != nil {
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
+                       continue
+               }
+
                if s.validateWriteRequest(writeEntity, metadata, specLocator, 
stream) != modelv1.Status_STATUS_SUCCEED {
+                       s.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
 
@@ -382,6 +388,7 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("navigation failed")
                        s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
+                       s.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
 
@@ -395,8 +402,10 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("publishing failed")
                        s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
+                       s.groupRepo.releaseRequest(metadata.Group)
                        continue
                }
+               s.groupRepo.releaseRequest(metadata.Group)
 
                succeedSent = append(succeedSent, succeedSentMessage{
                        metadata:  metadata,
@@ -409,6 +418,16 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
 var emptyTraceQueryResponse = &tracev1.QueryResponse{Traces: 
make([]*tracev1.Trace, 0)}
 
 func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) 
(resp *tracev1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               if acquireErr := s.groupRepo.acquireRequest(g); acquireErr != 
nil {
+                       return nil, status.Errorf(codes.FailedPrecondition, 
"group %s is pending deletion", g)
+               }
+       }
+       defer func() {
+               for _, g := range req.Groups {
+                       s.groupRepo.releaseRequest(g)
+               }
+       }()
        for _, g := range req.Groups {
                s.metrics.totalStarted.Inc(1, g, "trace", "query")
        }
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index bf7c9fedf..21ff035c5 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -91,6 +91,10 @@ func (s *dataSVC) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return s.schemaRepo.LoadGroup(name)
 }
 
+func (s *dataSVC) DropGroup(_ context.Context, groupName string) error {
+       return s.schemaRepo.DropGroup(groupName)
+}
+
 func (s *dataSVC) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
@@ -275,6 +279,9 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
        if subscribeErr := 
s.pipeline.Subscribe(data.TopicMeasureCollectDataInfo, 
collectDataInfoListener); subscribeErr != nil {
                return fmt.Errorf("failed to subscribe to collect data info 
topic: %w", subscribeErr)
        }
+       if dropGroupErr := s.pipeline.Subscribe(data.TopicMeasureDropGroup, 
&dropGroupDataListener{s: s}); dropGroupErr != nil {
+               return fmt.Errorf("failed to subscribe to drop group topic: 
%w", dropGroupErr)
+       }
 
        if err = s.createDataNativeObservabilityGroup(ctx); err != nil {
                return err
@@ -588,3 +595,19 @@ func (l *collectDataInfoListener) Rev(ctx context.Context, 
message bus.Message)
        }
        return bus.NewMessage(message.ID(), dataInfo)
 }
+
+type dropGroupDataListener struct {
+       *bus.UnImplementedHealthyListener
+       s *dataSVC
+}
+
+func (l *dropGroupDataListener) Rev(ctx context.Context, message bus.Message) 
bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for drop group request"))
+       }
+       if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
drop group data: %v", dropErr))
+       }
+       return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 6e946aba5..7212c3316 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -75,6 +75,10 @@ func (s *liaison) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return s.schemaRepo.LoadGroup(name)
 }
 
+func (s *liaison) DropGroup(_ context.Context, groupName string) error {
+       return s.schemaRepo.DropGroup(groupName)
+}
+
 func (s *liaison) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
@@ -188,12 +192,16 @@ func (s *liaison) PreRun(ctx context.Context) error {
        }
        if metaSvc, ok := s.metadata.(metadata.Service); ok {
                
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+               
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_MEASURE, s)
        }
 
        collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
        if subscribeErr := 
s.pipeline.Subscribe(data.TopicMeasureCollectLiaisonInfo, 
collectLiaisonInfoListener); subscribeErr != nil {
                return fmt.Errorf("failed to subscribe to collect liaison info 
topic: %w", subscribeErr)
        }
+       if subscribeErr := s.pipeline.Subscribe(data.TopicMeasureDropGroup, 
&dropGroupLiaisonListener{s: s}); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to drop group topic: 
%w", subscribeErr)
+       }
 
        return topNResultPipeline.Subscribe(data.TopicMeasureWrite, 
writeListener)
 }
@@ -238,3 +246,19 @@ func (l *collectLiaisonInfoListener) Rev(ctx 
context.Context, message bus.Messag
        }
        return bus.NewMessage(message.ID(), liaisonInfo)
 }
+
+type dropGroupLiaisonListener struct {
+       *bus.UnImplementedHealthyListener
+       s *liaison
+}
+
+func (l *dropGroupLiaisonListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for drop group liaison request"))
+       }
+       if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
drop group data on liaison: %v", dropErr))
+       }
+       return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index 97e65e9b7..ebb4f9f7c 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -98,6 +98,10 @@ func (s *standalone) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return s.schemaRepo.LoadGroup(name)
 }
 
+func (s *standalone) DropGroup(_ context.Context, groupName string) error {
+       return s.schemaRepo.DropGroup(groupName)
+}
+
 func (s *standalone) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
@@ -274,6 +278,7 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if metaSvc, ok := s.metadata.(metadata.Service); ok {
                metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_MEASURE, 
s.schemaRepo)
                
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+               
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_MEASURE, s)
        }
 
        s.cm = newCacheMetrics(s.omr)
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 2cbaa503c..0608e19ce 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -669,6 +669,10 @@ func (s *clientService) CollectLiaisonInfo(ctx 
context.Context, group string) ([
        return s.infoCollectorRegistry.CollectLiaisonInfo(ctx, group)
 }
 
+func (s *clientService) DropGroup(ctx context.Context, catalog 
commonv1.Catalog, group string) error {
+       return s.infoCollectorRegistry.DropGroup(ctx, catalog, group)
+}
+
 func (s *clientService) RegisterDataCollector(catalog commonv1.Catalog, 
collector schema.DataInfoCollector) {
        s.infoCollectorRegistry.RegisterDataCollector(catalog, collector)
 }
@@ -677,6 +681,10 @@ func (s *clientService) RegisterLiaisonCollector(catalog 
commonv1.Catalog, colle
        s.infoCollectorRegistry.RegisterLiaisonCollector(catalog, collector)
 }
 
+func (s *clientService) RegisterGroupDropHandler(catalog commonv1.Catalog, 
handler schema.GroupDropHandler) {
+       s.infoCollectorRegistry.RegisterGroupDropHandler(catalog, handler)
+}
+
 func (s *clientService) SetDataBroadcaster(broadcaster bus.Broadcaster) {
        s.dataBroadcaster = broadcaster
 }
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 2e21ffbc8..ac5b6508c 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -55,6 +55,7 @@ type Repo interface {
        PropertyRegistry() schema.Property
        CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error)
        CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo, 
error)
+       DropGroup(ctx context.Context, catalog commonv1.Catalog, group string) 
error
 }
 
 // Service is the metadata repository.
@@ -69,4 +70,5 @@ type Service interface {
        SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
        RegisterDataCollector(catalog commonv1.Catalog, collector 
schema.DataInfoCollector)
        RegisterLiaisonCollector(catalog commonv1.Catalog, collector 
schema.LiaisonInfoCollector)
+       RegisterGroupDropHandler(catalog commonv1.Catalog, handler 
schema.GroupDropHandler)
 }
diff --git a/banyand/metadata/schema/collector.go 
b/banyand/metadata/schema/collector.go
index 6f348d91b..d8ea0e1e0 100644
--- a/banyand/metadata/schema/collector.go
+++ b/banyand/metadata/schema/collector.go
@@ -23,6 +23,8 @@ import (
        "sync"
        "time"
 
+       "go.uber.org/multierr"
+
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -36,11 +38,17 @@ type GroupGetter interface {
        GetGroup(ctx context.Context, group string) (*commonv1.Group, error)
 }
 
+// GroupDropHandler handles dropping group physical data files on the local 
node.
+type GroupDropHandler interface {
+       DropGroup(ctx context.Context, group string) error
+}
+
 // InfoCollectorRegistry manages data and liaison info collectors.
 type InfoCollectorRegistry struct {
        groupGetter        GroupGetter
        dataCollectors     map[commonv1.Catalog]DataInfoCollector
        liaisonCollectors  map[commonv1.Catalog]LiaisonInfoCollector
+       dropHandlers       map[commonv1.Catalog]GroupDropHandler
        dataBroadcaster    bus.Broadcaster
        liaisonBroadcaster bus.Broadcaster
        l                  *logger.Logger
@@ -53,6 +61,7 @@ func NewInfoCollectorRegistry(l *logger.Logger, groupGetter 
GroupGetter) *InfoCo
                groupGetter:       groupGetter,
                dataCollectors:    make(map[commonv1.Catalog]DataInfoCollector),
                liaisonCollectors: 
make(map[commonv1.Catalog]LiaisonInfoCollector),
+               dropHandlers:      make(map[commonv1.Catalog]GroupDropHandler),
                l:                 l,
        }
 }
@@ -199,6 +208,72 @@ func (icr *InfoCollectorRegistry) 
collectLiaisonInfoLocal(ctx context.Context, c
        return nil, nil
 }
 
+// DropGroup drops the group data files on all nodes.
+func (icr *InfoCollectorRegistry) DropGroup(ctx context.Context, catalog 
commonv1.Catalog, group string) error {
+       if localErr := icr.dropGroupLocal(ctx, catalog, group); localErr != nil 
{
+               return fmt.Errorf("failed to drop group locally: %w", localErr)
+       }
+       icr.mux.RLock()
+       dataBroadcaster := icr.dataBroadcaster
+       liaisonBroadcaster := icr.liaisonBroadcaster
+       icr.mux.RUnlock()
+
+       var topic bus.Topic
+       switch catalog {
+       case commonv1.Catalog_CATALOG_MEASURE:
+               topic = data.TopicMeasureDropGroup
+       case commonv1.Catalog_CATALOG_STREAM:
+               topic = data.TopicStreamDropGroup
+       case commonv1.Catalog_CATALOG_TRACE:
+               topic = data.TopicTraceDropGroup
+       default:
+               return nil
+       }
+
+       var errs []error
+       if dataBroadcaster != nil {
+               if broadcastErr := icr.broadcastDropGroup(dataBroadcaster, 
topic, group); broadcastErr != nil {
+                       errs = append(errs, fmt.Errorf("data nodes: %w", 
broadcastErr))
+               }
+       }
+       if liaisonBroadcaster != nil {
+               if broadcastErr := icr.broadcastDropGroup(liaisonBroadcaster, 
topic, group); broadcastErr != nil {
+                       errs = append(errs, fmt.Errorf("liaison nodes: %w", 
broadcastErr))
+               }
+       }
+       return multierr.Combine(errs...)
+}
+
+func (icr *InfoCollectorRegistry) broadcastDropGroup(broadcaster 
bus.Broadcaster, topic bus.Topic, group string) error {
+       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
&databasev1.GroupRegistryServiceDeleteRequest{Group: group})
+       futures, broadcastErr := broadcaster.Broadcast(30*time.Second, topic, 
message)
+       if broadcastErr != nil {
+               return fmt.Errorf("failed to broadcast drop group request: %w", 
broadcastErr)
+       }
+       var errs []error
+       for _, future := range futures {
+               msg, getErr := future.Get()
+               if getErr != nil {
+                       errs = append(errs, getErr)
+                       continue
+               }
+               if errMsg, ok := msg.Data().(*common.Error); ok {
+                       errs = append(errs, fmt.Errorf("node reported error 
dropping group: %s", errMsg.Error()))
+               }
+       }
+       return multierr.Combine(errs...)
+}
+
+func (icr *InfoCollectorRegistry) dropGroupLocal(ctx context.Context, catalog 
commonv1.Catalog, group string) error {
+       icr.mux.RLock()
+       handler, hasHandler := icr.dropHandlers[catalog]
+       icr.mux.RUnlock()
+       if hasHandler && handler != nil {
+               return handler.DropGroup(ctx, group)
+       }
+       return nil
+}
+
 // RegisterDataCollector registers a data info collector for a specific 
catalog.
 func (icr *InfoCollectorRegistry) RegisterDataCollector(catalog 
commonv1.Catalog, collector DataInfoCollector) {
        icr.mux.Lock()
@@ -213,6 +288,13 @@ func (icr *InfoCollectorRegistry) 
RegisterLiaisonCollector(catalog commonv1.Cata
        icr.liaisonCollectors[catalog] = collector
 }
 
+// RegisterGroupDropHandler registers a group drop handler for a specific 
catalog.
+func (icr *InfoCollectorRegistry) RegisterGroupDropHandler(catalog 
commonv1.Catalog, handler GroupDropHandler) {
+       icr.mux.Lock()
+       defer icr.mux.Unlock()
+       icr.dropHandlers[catalog] = handler
+}
+
 // SetDataBroadcaster sets the broadcaster for data info collection.
 func (icr *InfoCollectorRegistry) SetDataBroadcaster(broadcaster 
bus.Broadcaster) {
        icr.mux.Lock()
diff --git a/banyand/property/db/db.go b/banyand/property/db/db.go
index 7b3c2b88f..8c535b4d3 100644
--- a/banyand/property/db/db.go
+++ b/banyand/property/db/db.go
@@ -21,6 +21,7 @@ package db
 import (
        "context"
        "errors"
+       "fmt"
        "path"
        "path/filepath"
        "strconv"
@@ -75,6 +76,8 @@ type Database interface {
        Repair(ctx context.Context, id []byte, shardID uint64, property 
*propertyv1.Property, deleteTime int64) error
        // TakeSnapShot takes a snapshot of the database.
        TakeSnapShot(ctx context.Context, sn string) *databasev1.Snapshot
+       // Drop closes and removes all shards for the given group and deletes 
the group directory.
+       Drop(groupName string) error
        // RegisterGossip registers the repair scheduler's gossip services with 
the given messenger.
        RegisterGossip(messenger gossip.Messenger)
        // Close closes the database.
@@ -380,6 +383,31 @@ func (db *database) getShard(group string, id 
common.ShardID) (*shard, bool) {
        return nil, false
 }
 
+// Drop closes and removes all shards for the given group and deletes the 
group directory.
+func (db *database) Drop(groupName string) (err error) {
+       value, ok := db.groups.LoadAndDelete(groupName)
+       if !ok {
+               return nil
+       }
+       gs := value.(*groupShards)
+       sLst := gs.shards.Load()
+       if sLst != nil {
+               for _, s := range *sLst {
+                       multierr.AppendInto(&err, s.close())
+               }
+               if err != nil {
+                       return err
+               }
+       }
+       defer func() {
+               if r := recover(); r != nil {
+                       err = fmt.Errorf("failed to remove group directory %s: 
%v", gs.location, r)
+               }
+       }()
+       db.lfs.MustRMAll(gs.location)
+       return nil
+}
+
 // RegisterGossip registers the repair scheduler's gossip services with the 
given messenger.
 func (db *database) RegisterGossip(messenger gossip.Messenger) {
        if db.repairScheduler == nil {
diff --git a/banyand/property/service.go b/banyand/property/service.go
index c7bded2c3..5309245ec 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -31,9 +31,11 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        obsservice 
"github.com/apache/skywalking-banyandb/banyand/observability/services"
        "github.com/apache/skywalking-banyandb/banyand/property/db"
@@ -63,19 +65,19 @@ type service struct {
        omr                      observability.MetricsRegistry
        lfs                      fs.FileSystem
        pm                       protector.Memory
-       closer                   *run.Closer
        db                       db.Database
+       closer                   *run.Closer
        l                        *logger.Logger
-       nodeID                   string
        repairScheduler          *repairScheduler
-       root                     string
        snapshotDir              string
+       root                     string
        repairDir                string
        repairBuildTreeCron      string
        repairTriggerCron        string
-       flushTimeout             time.Duration
-       expireTimeout            time.Duration
+       nodeID                   string
        repairQuickBuildTreeTime time.Duration
+       expireTimeout            time.Duration
+       flushTimeout             time.Duration
        repairTreeSlotCount      int
        maxDiskUsagePercent      int
        maxFileSnapshotNum       int
@@ -201,6 +203,7 @@ func (s *service) PreRun(ctx context.Context) error {
                }
                s.repairScheduler = scheduler
        }
+       s.metadata.RegisterHandler("property", schema.KindGroup, 
&propertyGroupEventHandler{svc: s})
        return multierr.Combine(
                s.pipeline.Subscribe(data.TopicPropertyUpdate, 
&updateListener{s: s, l: s.l, path: path, maxDiskUsagePercent: 
s.maxDiskUsagePercent}),
                s.pipeline.Subscribe(data.TopicPropertyDelete, 
&deleteListener{s: s}),
@@ -210,6 +213,30 @@ func (s *service) PreRun(ctx context.Context) error {
        )
 }
 
+type propertyGroupEventHandler struct {
+       svc *service
+}
+
+func (h *propertyGroupEventHandler) OnInit([]schema.Kind) (bool, []int64) {
+       return false, nil
+}
+
+func (h *propertyGroupEventHandler) OnAddOrUpdate(_ schema.Metadata) {}
+
+func (h *propertyGroupEventHandler) OnDelete(md schema.Metadata) {
+       if md.Kind != schema.KindGroup {
+               return
+       }
+       group := md.Spec.(*commonv1.Group)
+       if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
+               return
+       }
+       groupName := group.Metadata.GetName()
+       if dropErr := h.svc.db.Drop(groupName); dropErr != nil {
+               h.svc.l.Error().Err(dropErr).Str("group", 
groupName).Msg("failed to drop group")
+       }
+}
+
 func (s *service) Serve() run.StopNotify {
        if s.gossipMessenger != nil {
                s.gossipMessenger.Serve(s.closer)
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index 49fe0f286..78da92c9c 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -76,6 +76,10 @@ func (s *liaison) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return s.schemaRepo.LoadGroup(name)
 }
 
+func (s *liaison) DropGroup(_ context.Context, groupName string) error {
+       return s.schemaRepo.DropGroup(groupName)
+}
+
 func (s *liaison) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
@@ -196,12 +200,16 @@ func (s *liaison) PreRun(ctx context.Context) error {
 
        if metaSvc, ok := s.metadata.(metadata.Service); ok {
                
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+               
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_STREAM, s)
        }
 
        collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
        if subscribeErr := 
s.pipeline.Subscribe(data.TopicStreamCollectLiaisonInfo, 
collectLiaisonInfoListener); subscribeErr != nil {
                return fmt.Errorf("failed to subscribe to collect liaison info 
topic: %w", subscribeErr)
        }
+       if subscribeErr := s.pipeline.Subscribe(data.TopicStreamDropGroup, 
&dropGroupLiaisonListener{s: s}); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to drop group topic: 
%w", subscribeErr)
+       }
 
        return s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
 }
@@ -246,3 +254,19 @@ func (l *collectLiaisonInfoListener) Rev(ctx 
context.Context, message bus.Messag
        }
        return bus.NewMessage(message.ID(), liaisonInfo)
 }
+
+type dropGroupLiaisonListener struct {
+       *bus.UnImplementedHealthyListener
+       s *liaison
+}
+
+func (l *dropGroupLiaisonListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for drop group liaison request"))
+       }
+       if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
drop group data on liaison: %v", dropErr))
+       }
+       return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 7174ae6ab..9dab8a867 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -97,6 +97,10 @@ func (s *standalone) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return s.schemaRepo.LoadGroup(name)
 }
 
+func (s *standalone) DropGroup(_ context.Context, groupName string) error {
+       return s.schemaRepo.DropGroup(groupName)
+}
+
 func (s *standalone) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
@@ -255,6 +259,7 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if metaSvc, ok := s.metadata.(metadata.Service); ok {
                metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_STREAM, 
&s.schemaRepo)
                
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+               
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_STREAM, s)
        }
        if s.pipeline == nil {
                return nil
@@ -264,6 +269,9 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if subscribeErr := 
s.pipeline.Subscribe(data.TopicStreamCollectDataInfo, collectDataInfoListener); 
subscribeErr != nil {
                return fmt.Errorf("failed to subscribe to collect data info 
topic: %w", subscribeErr)
        }
+       if dropGroupErr := s.pipeline.Subscribe(data.TopicStreamDropGroup, 
&dropGroupDataListener{s: s}); dropGroupErr != nil {
+               return fmt.Errorf("failed to subscribe to drop group topic: 
%w", dropGroupErr)
+       }
 
        s.localPipeline = queue.Local()
        if err = s.pipeline.Subscribe(data.TopicSnapshot, &snapshotListener{s: 
s}); err != nil {
@@ -400,3 +408,19 @@ func (l *collectDataInfoListener) Rev(ctx context.Context, 
message bus.Message)
        }
        return bus.NewMessage(message.ID(), dataInfo)
 }
+
+type dropGroupDataListener struct {
+       *bus.UnImplementedHealthyListener
+       s *standalone
+}
+
+func (l *dropGroupDataListener) Rev(ctx context.Context, message bus.Message) 
bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for drop group request"))
+       }
+       if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
drop group data: %v", dropErr))
+       }
+       return bus.NewMessage(message.ID(), req)
+}
diff --git a/banyand/trace/handoff_controller.go 
b/banyand/trace/handoff_controller.go
index 18f8d66c2..bc45154f3 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -620,6 +620,52 @@ func (hc *handoffController) filterNodesForShard(nodes 
[]string, group string, s
        return filtered
 }
 
+func (hc *handoffController) deletePartsByGroup(group string) {
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       var totalRemoved int
+       for nodeAddr, nodeQueue := range hc.nodeQueues {
+               pending, listErr := nodeQueue.listPending()
+               if listErr != nil {
+                       hc.l.Warn().Err(listErr).Str("node", 
nodeAddr).Msg("failed to list pending parts for group cleanup")
+                       continue
+               }
+               for _, ptp := range pending {
+                       meta, metaErr := nodeQueue.getMetadata(ptp.PartID, 
ptp.PartType)
+                       if metaErr != nil {
+                               hc.l.Warn().Err(metaErr).
+                                       Str("node", nodeAddr).
+                                       Uint64("partID", ptp.PartID).
+                                       Str("partType", ptp.PartType).
+                                       Msg("failed to read metadata during 
group cleanup")
+                               continue
+                       }
+                       if meta.Group != group {
+                               continue
+                       }
+                       if completeErr := nodeQueue.complete(ptp.PartID, 
ptp.PartType); completeErr != nil {
+                               hc.l.Warn().Err(completeErr).
+                                       Str("node", nodeAddr).
+                                       Uint64("partID", ptp.PartID).
+                                       Str("partType", ptp.PartType).
+                                       Msg("failed to remove part during group 
cleanup")
+                               continue
+                       }
+                       if meta.PartSizeBytes > 0 {
+                               hc.updateTotalSize(-int64(meta.PartSizeBytes))
+                       }
+                       totalRemoved++
+               }
+       }
+       if totalRemoved > 0 {
+               hc.l.Info().
+                       Str("group", group).
+                       Int("removedParts", totalRemoved).
+                       Msg("cleaned up handoff parts for deleted group")
+       }
+}
+
 // close closes the handoff controller.
 func (hc *handoffController) close() error {
        // Stop the monitor
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 26f2e0ea3..7377c4201 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -58,11 +58,12 @@ type SchemaService interface {
 
 type schemaRepo struct {
        resourceSchema.Repository
-       l        *logger.Logger
-       metadata metadata.Repo
-       path     string
-       nodeID   string
-       role     databasev1.Role
+       onGroupDelete func(groupName string)
+       l             *logger.Logger
+       metadata      metadata.Repo
+       path          string
+       nodeID        string
+       role          databasev1.Role
 }
 
 func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, 
nodeID string) schemaRepo {
@@ -96,6 +97,9 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
traceDataNodeRegistry grpc.
                        resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
                ),
        }
+       if svc.handoffCtrl != nil {
+               sr.onGroupDelete = svc.handoffCtrl.deletePartsByGroup
+       }
        sr.start()
        return sr
 }
@@ -200,6 +204,9 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
                if g.Catalog != commonv1.Catalog_CATALOG_TRACE {
                        return
                }
+               if sr.onGroupDelete != nil {
+                       sr.onGroupDelete(g.Metadata.Name)
+               }
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
                        Typ:      resourceSchema.EventDelete,
                        Kind:     resourceSchema.EventKindGroup,
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 9ea23d179..5532ce011 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -104,6 +104,22 @@ func (cl *collectLiaisonInfoListener) Rev(ctx 
context.Context, message bus.Messa
        return bus.NewMessage(message.ID(), liaisonInfo)
 }
 
+type dropGroupLiaisonListener struct {
+       *bus.UnImplementedHealthyListener
+       l *liaison
+}
+
+func (dl *dropGroupLiaisonListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for drop group liaison request"))
+       }
+       if dropErr := dl.l.DropGroup(ctx, req.Group); dropErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
drop group data on liaison: %v", dropErr))
+       }
+       return bus.NewMessage(message.ID(), req)
+}
+
 // LiaisonService returns a new liaison service (deprecated - use NewLiaison).
 func LiaisonService(_ context.Context) (Service, error) {
        return &liaison{}, nil
@@ -280,12 +296,16 @@ func (l *liaison) PreRun(ctx context.Context) error {
 
        if metaSvc, ok := l.metadata.(metadata.Service); ok {
                
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, l)
+               
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_TRACE, l)
        }
 
        collectLiaisonInfoListener := &collectLiaisonInfoListener{l: l}
        if subscribeErr := 
l.pipeline.Subscribe(data.TopicTraceCollectLiaisonInfo, 
collectLiaisonInfoListener); subscribeErr != nil {
                return fmt.Errorf("failed to subscribe to collect liaison info 
topic: %w", subscribeErr)
        }
+       if subscribeErr := l.pipeline.Subscribe(data.TopicTraceDropGroup, 
&dropGroupLiaisonListener{l: l}); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to drop group topic: 
%w", subscribeErr)
+       }
 
        return l.pipeline.Subscribe(data.TopicTraceWrite, l.writeListener)
 }
@@ -310,6 +330,10 @@ func (l *liaison) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return l.schemaRepo.LoadGroup(name)
 }
 
+func (l *liaison) DropGroup(_ context.Context, groupName string) error {
+       return l.schemaRepo.DropGroup(groupName)
+}
+
 func (l *liaison) Trace(metadata *commonv1.Metadata) (Trace, error) {
        sm, ok := l.schemaRepo.Trace(metadata)
        if !ok {
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 0e42b5c5b..77dd2f873 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -162,11 +162,16 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if metaSvc, ok := s.metadata.(metadata.Service); ok {
                metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_TRACE, 
&s.schemaRepo)
                
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, s)
+               
metaSvc.RegisterGroupDropHandler(commonv1.Catalog_CATALOG_TRACE, s)
        }
        subErr := s.pipeline.Subscribe(data.TopicTraceCollectDataInfo, 
&collectDataInfoListener{s: s})
        if subErr != nil {
                return fmt.Errorf("failed to subscribe to 
TopicTraceCollectDataInfo: %w", subErr)
        }
+       dropGroupErr := s.pipeline.Subscribe(data.TopicTraceDropGroup, 
&dropGroupDataListener{s: s})
+       if dropGroupErr != nil {
+               return fmt.Errorf("failed to subscribe to TopicTraceDropGroup: 
%w", dropGroupErr)
+       }
 
        // Initialize snapshot directory
        s.snapshotDir = filepath.Join(path, "snapshots")
@@ -230,6 +235,10 @@ func (s *standalone) LoadGroup(name string) 
(resourceSchema.Group, bool) {
        return s.schemaRepo.LoadGroup(name)
 }
 
+func (s *standalone) DropGroup(_ context.Context, groupName string) error {
+       return s.schemaRepo.DropGroup(groupName)
+}
+
 func (s *standalone) Trace(metadata *commonv1.Metadata) (Trace, error) {
        sm, ok := s.schemaRepo.Trace(metadata)
        if !ok {
@@ -563,6 +572,22 @@ func (l *collectDataInfoListener) Rev(ctx context.Context, 
message bus.Message)
        return bus.NewMessage(message.ID(), dataInfo)
 }
 
+type dropGroupDataListener struct {
+       *bus.UnImplementedHealthyListener
+       s *standalone
+}
+
+func (l *dropGroupDataListener) Rev(ctx context.Context, message bus.Message) 
bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceDeleteRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for drop group request"))
+       }
+       if dropErr := l.s.DropGroup(ctx, req.Group); dropErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
drop group data: %v", dropErr))
+       }
+       return bus.NewMessage(message.ID(), req)
+}
+
 // NewService returns a new service.
 func NewService(metadata metadata.Repo, pipeline queue.Server, omr 
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
        return &standalone{
diff --git a/bydbctl/internal/cmd/group_test.go 
b/bydbctl/internal/cmd/group_test.go
index 1b5cde6be..ceac5023e 100644
--- a/bydbctl/internal/cmd/group_test.go
+++ b/bydbctl/internal/cmd/group_test.go
@@ -96,12 +96,13 @@ resource_opts:
        })
 
        It("delete group", func() {
-               rootCmd.SetArgs([]string{"group", "delete", "-g", "group1"})
-               out := capturer.CaptureStdout(func() {
-                       err := rootCmd.Execute()
-                       Expect(err).NotTo(HaveOccurred())
-               })
-               Expect(out).To(ContainSubstring("group group1 is deleted"))
+               Eventually(func(g Gomega) {
+                       rootCmd.SetArgs([]string{"group", "delete", "-g", 
"group1"})
+                       out := capturer.CaptureStdout(func() {
+                               
g.Expect(rootCmd.Execute()).NotTo(HaveOccurred())
+                       })
+                       g.Expect(out).To(ContainSubstring("group group1 is 
deleted"))
+               }).Should(Succeed())
        })
 
        It("list group", func() {
@@ -132,7 +133,7 @@ resource_opts:
                })
                resp := new(databasev1.GroupRegistryServiceListResponse)
                helpers.UnmarshalYAML([]byte(out), resp)
-               Expect(resp.Group).To(HaveLen(2)) // group1, group2 [internal 
group]
+               Expect(resp.Group).To(HaveLen(3)) // group1, group2, 
_deletion_task (internal group)
        })
 
        AfterEach(func() {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index a4f79d4a1..d75e4fd19 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2836,6 +2836,7 @@ GroupDeletionTask represents the status of a group 
deletion operation.
 | deleted_data_size_bytes | [int64](#int64) |  | deleted_data_size_bytes is 
the size of data that has been deleted in bytes. |
 | message | [string](#string) |  | message provides additional information 
about the task status. |
 | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
created_at is the timestamp when the task was created. |
+| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
updated_at is the timestamp when the task was last updated. |
 
 
 
@@ -2910,6 +2911,7 @@ GroupRegistryServiceDeleteRequest is the request for 
deleting a group.
 | group | [string](#string) |  | group is the name of the group to delete. |
 | dry_run | [bool](#bool) |  | dry_run indicates whether to perform a dry run 
without actually deleting data. When true, returns what would be deleted 
without making changes. |
 | force | [bool](#bool) |  | force indicates whether to force delete the group 
even if it contains data. When false, deletion will fail if the group is not 
empty. |
+| data_only | [bool](#bool) |  | data_only indicates whether to delete only 
data files without removing metadata. When true, metadata are preserved. |
 
 
 
@@ -2924,8 +2926,7 @@ GroupRegistryServiceDeleteResponse is the response for 
deleting a group.
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| deleted | [bool](#bool) |  | deleted indicates whether the group was 
deleted. |
-| task_id | [string](#string) |  | task_id is the ID of the background 
deletion task. |
+| schema_info | [SchemaInfo](#banyandb-database-v1-SchemaInfo) |  | 
schema_info contains the schema resources that would be deleted (populated in 
dry-run mode). |
 
 
 
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index e7c4d013c..e14426ae7 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -44,6 +44,9 @@ func (s *store) InsertSeriesBatch(batch index.Batch) error {
                return nil
        }
        if !s.closer.AddRunning() {
+               if batch.PersistentCallback != nil {
+                       batch.PersistentCallback(errors.New("store is closed"))
+               }
                return nil
        }
        defer s.closer.Done()
@@ -64,6 +67,9 @@ func (s *store) UpdateSeriesBatch(batch index.Batch) error {
                return nil
        }
        if !s.closer.AddRunning() {
+               if batch.PersistentCallback != nil {
+                       batch.PersistentCallback(errors.New("store is closed"))
+               }
                return nil
        }
        defer s.closer.Done()
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index ac3937ec9..5d2c28400 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -27,7 +27,6 @@ import (
        "time"
 
        "github.com/pkg/errors"
-       "github.com/rs/zerolog/log"
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
 
@@ -296,11 +295,18 @@ func (sr *schemaRepo) createGroup(name string) (g *group) 
{
 func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
        name := groupMeta.GetName()
        g, loaded := sr.groupMap.LoadAndDelete(name)
-       log.Info().Str("group", name).Bool("loaded", loaded).Msg("deleting 
group")
        if !loaded {
                return nil
        }
-       return g.(*group).close()
+       grp := g.(*group)
+       return grp.close()
+}
+
+func (sr *schemaRepo) DropGroup(name string) error {
+       if g, ok := sr.groupMap.Load(name); ok {
+               return g.(*group).drop()
+       }
+       return nil
 }
 
 func (sr *schemaRepo) getGroup(name string) (*group, bool) {
@@ -570,3 +576,14 @@ func (g *group) close() (err error) {
        }
        return multierr.Append(err, g.SupplyTSDB().Close())
 }
+
+func (g *group) drop() error {
+       if !g.isInit() || g.isPortable() {
+               return nil
+       }
+       db := g.db.Load()
+       if db == nil {
+               return nil
+       }
+       return db.(DB).Drop()
+}
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index e80a59a4c..e99bcce9c 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -97,6 +97,7 @@ type ResourceSupplier interface {
 type DB interface {
        io.Closer
        UpdateOptions(opts *commonv1.ResourceOpts)
+       Drop() error
 }
 
 // Repository is the collection of several hierarchies groups by a "Group".
@@ -109,4 +110,5 @@ type Repository interface {
        LoadResource(metadata *commonv1.Metadata) (Resource, bool)
        Close()
        StopCh() <-chan struct{}
+       DropGroup(groupName string) error
 }
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 321ad5f4c..7dcd4412b 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -736,14 +736,7 @@ func DataNodeWithAddrAndDir(config *ClusterConfig, flags 
...string) (string, str
        }
 }
 
-// LiaisonNode runs a liaison node.
-func LiaisonNode(config *ClusterConfig, flags ...string) (grpcAddr string, 
closeFn func()) {
-       grpcAddr, _, closeFn = LiaisonNodeWithHTTP(config, flags...)
-       return
-}
-
-// LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the 
gRPC and HTTP addresses.
-func LiaisonNodeWithHTTP(config *ClusterConfig, flags ...string) (string, 
string, func()) {
+func startLiaisonNode(config *ClusterConfig, path string, flags ...string) 
(string, string, func()) {
        if config == nil {
                config = defaultClusterConfig
        }
@@ -752,7 +745,6 @@ func LiaisonNodeWithHTTP(config *ClusterConfig, flags 
...string) (string, string
        grpcAddr := fmt.Sprintf("%s:%d", host, ports[0])
        httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
        nodeHost := "127.0.0.1"
-       path, deferFn, err := test.NewSpace()
        logger.Infof("liaison test directory: %s", path)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        isPropertyMode := config.SchemaRegistry.Mode == ModeProperty
@@ -816,6 +808,33 @@ func LiaisonNodeWithHTTP(config *ClusterConfig, flags 
...string) (string, string
                })
                fmt.Println("done")
                closeFn()
+       }
+}
+
+// LiaisonNode runs a liaison node.
+func LiaisonNode(config *ClusterConfig, flags ...string) (grpcAddr string, 
closeFn func()) {
+       grpcAddr, _, closeFn = LiaisonNodeWithHTTP(config, flags...)
+       return
+}
+
+// LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the 
gRPC and HTTP addresses.
+func LiaisonNodeWithHTTP(config *ClusterConfig, flags ...string) (string, 
string, func()) {
+       dataDir, deferFn, dirErr := test.NewSpace()
+       gomega.Expect(dirErr).NotTo(gomega.HaveOccurred())
+       grpcAddr, httpAddr, closeFn := startLiaisonNode(config, dataDir, 
flags...)
+       return grpcAddr, httpAddr, func() {
+               closeFn()
+               deferFn()
+       }
+}
+
+// LiaisonNodeWithAddrAndDir runs a liaison node and returns the gRPC address, 
root data path, and closer.
+func LiaisonNodeWithAddrAndDir(config *ClusterConfig, flags ...string) 
(string, string, func()) {
+       dataDir, deferFn, dirErr := test.NewSpace()
+       gomega.Expect(dirErr).NotTo(gomega.HaveOccurred())
+       grpcAddr, _, closeFn := startLiaisonNode(config, dataDir, flags...)
+       return grpcAddr, dataDir, func() {
+               closeFn()
                deferFn()
        }
 }
diff --git a/test/integration/distributed/deletion/deletion_suite_test.go 
b/test/integration/distributed/deletion/deletion_suite_test.go
new file mode 100644
index 000000000..8c0015cc6
--- /dev/null
+++ b/test/integration/distributed/deletion/deletion_suite_test.go
@@ -0,0 +1,313 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_deletion_test provides integration tests for group 
deletion in distributed mode.
+package integration_deletion_test
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/status"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestDeletion(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Deletion Suite")
+}
+
+var (
+       deferFunc       func()
+       goods           []gleak.Goroutine
+       connection      *grpc.ClientConn
+       groupClient     databasev1.GroupRegistryServiceClient
+       dataNode0Path   string
+       dataNode1Path   string
+       liaisonNodePath string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       pool.EnableStackTracking(true)
+       goods = gleak.Goroutines()
+
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+
+       clusterConfig := setup.EtcdClusterConfig(ep)
+       By("Starting data node 0")
+       _, dn0Path, closeDataNode0 := 
setup.DataNodeWithAddrAndDir(clusterConfig)
+       By("Starting data node 1")
+       _, dn1Path, closeDataNode1 := 
setup.DataNodeWithAddrAndDir(clusterConfig)
+       By("Starting liaison node")
+       liaisonAddr, liaisonPath, closerLiaisonNode := 
setup.LiaisonNodeWithAddrAndDir(clusterConfig)
+
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               closeDataNode1()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+
+       return []byte(liaisonAddr + "," + dn0Path + "," + dn1Path + "," + 
liaisonPath)
+}, func(address []byte) {
+       parts := strings.SplitN(string(address), ",", 4)
+       liaisonGrpcAddr := parts[0]
+       dataNode0Path = parts[1]
+       dataNode1Path = parts[2]
+       liaisonNodePath = parts[3]
+
+       var connErr error
+       connection, connErr = grpchelper.Conn(liaisonGrpcAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(connErr).NotTo(HaveOccurred())
+
+       groupClient = databasev1.NewGroupRegistryServiceClient(connection)
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Deletion Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
+
+var _ = Describe("GroupDeletion", func() {
+       It("returns NotFound when deleting a nonexistent group", func() {
+               _, err := groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                       Group: "nonexistent-group-dist-xyz",
+               })
+               Expect(err).Should(HaveOccurred())
+               errStatus, ok := status.FromError(err)
+               Expect(ok).To(BeTrue())
+               Expect(errStatus.Code()).To(Equal(codes.NotFound))
+       })
+
+       It("can delete an empty group without force flag", func() {
+               const newGroup = "dist-empty-deletion-group"
+               dn0StreamDir := filepath.Join(dataNode0Path, "stream", "data", 
newGroup)
+               dn1StreamDir := filepath.Join(dataNode1Path, "stream", "data", 
newGroup)
+               liaisonStreamDir := filepath.Join(liaisonNodePath, "stream", 
"data", newGroup)
+               By("Creating a new empty group")
+               _, err := groupClient.Create(context.TODO(), 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: newGroup},
+                               Catalog:  commonv1.Catalog_CATALOG_STREAM,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 1,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+
+               By("Verifying the group exists")
+               getResp, getErr := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+               Expect(getErr).ShouldNot(HaveOccurred())
+               
Expect(getResp.GetGroup().GetMetadata().GetName()).To(Equal(newGroup))
+
+               By("Verifying group data directories exist on all nodes")
+               Eventually(func() bool {
+                       _, statErr0 := os.Stat(dn0StreamDir)
+                       _, statErr1 := os.Stat(dn1StreamDir)
+                       _, statErrL := os.Stat(liaisonStreamDir)
+                       return statErr0 == nil && statErr1 == nil && statErrL 
== nil
+               }, flags.EventuallyTimeout).Should(BeTrue())
+
+               By("Deleting the empty group without force")
+               Eventually(func() error {
+                       _, deleteErr := groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                               Group: newGroup,
+                       })
+                       return deleteErr
+               }, flags.EventuallyTimeout).Should(Succeed())
+
+               By("Verifying the group is eventually removed")
+               Eventually(func() codes.Code {
+                       _, getErr := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+                       if getErr == nil {
+                               return codes.OK
+                       }
+                       st, _ := status.FromError(getErr)
+                       return st.Code()
+               }, flags.EventuallyTimeout).Should(Equal(codes.NotFound))
+
+               By("Verifying group data directories are removed from all 
nodes")
+               Eventually(func() bool {
+                       _, statErr0 := os.Stat(dn0StreamDir)
+                       _, statErr1 := os.Stat(dn1StreamDir)
+                       _, statErrL := os.Stat(liaisonStreamDir)
+                       return os.IsNotExist(statErr0) && 
os.IsNotExist(statErr1) && os.IsNotExist(statErrL)
+               }, flags.EventuallyTimeout).Should(BeTrue())
+       })
+
+       It("can delete an existing group with force=true", func() {
+               const groupName = "dist-force-deletion-group"
+               dn0MeasureDir := filepath.Join(dataNode0Path, "measure", 
"data", groupName)
+               dn1MeasureDir := filepath.Join(dataNode1Path, "measure", 
"data", groupName)
+               liaisonMeasureDir := filepath.Join(liaisonNodePath, "measure", 
"data", groupName)
+               By("Creating a group with resources")
+               _, err := groupClient.Create(context.TODO(), 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: groupName},
+                               Catalog:  commonv1.Catalog_CATALOG_MEASURE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 1,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+
+               By("Verifying group data directories exist on all nodes")
+               Eventually(func() bool {
+                       _, statErr0 := os.Stat(dn0MeasureDir)
+                       _, statErr1 := os.Stat(dn1MeasureDir)
+                       _, statErrL := os.Stat(liaisonMeasureDir)
+                       return statErr0 == nil && statErr1 == nil && statErrL 
== nil
+               }, flags.EventuallyTimeout).Should(BeTrue())
+
+               By("Deleting group with force=true")
+               _, err = groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                       Group: groupName,
+                       Force: true,
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+
+               By("Verifying the group is eventually removed")
+               Eventually(func() codes.Code {
+                       _, getErr := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: groupName})
+                       if getErr == nil {
+                               return codes.OK
+                       }
+                       st, _ := status.FromError(getErr)
+                       return st.Code()
+               }, flags.EventuallyTimeout).Should(Equal(codes.NotFound))
+
+               By("Verifying group data directories are removed from all 
nodes")
+               Eventually(func() bool {
+                       _, statErr0 := os.Stat(dn0MeasureDir)
+                       _, statErr1 := os.Stat(dn1MeasureDir)
+                       _, statErrL := os.Stat(liaisonMeasureDir)
+                       return os.IsNotExist(statErr0) && 
os.IsNotExist(statErr1) && os.IsNotExist(statErrL)
+               }, flags.EventuallyTimeout).Should(BeTrue())
+       })
+
+       It("can query deletion task status until completed", func() {
+               const groupName = "dist-deletion-task-group"
+               By("Creating a group to delete")
+               _, err := groupClient.Create(context.TODO(), 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: groupName},
+                               Catalog:  commonv1.Catalog_CATALOG_MEASURE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 1,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+
+               By("Initiating group deletion with force=true")
+               _, err = groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                       Group: groupName,
+                       Force: true,
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+
+               By("Waiting for deletion task to reach COMPLETED")
+               Eventually(func() databasev1.GroupDeletionTask_Phase {
+                       queryResp, queryErr := 
groupClient.Query(context.TODO(), &databasev1.GroupRegistryServiceQueryRequest{
+                               Group: groupName,
+                       })
+                       if queryErr != nil {
+                               return 
databasev1.GroupDeletionTask_PHASE_UNSPECIFIED
+                       }
+                       return queryResp.GetTask().GetCurrentPhase()
+               }, flags.EventuallyTimeout, 
100*time.Millisecond).Should(Equal(databasev1.GroupDeletionTask_PHASE_COMPLETED))
+       })
+})
diff --git a/test/integration/distributed/inspect/common.go 
b/test/integration/distributed/inspection/common.go
similarity index 99%
rename from test/integration/distributed/inspect/common.go
rename to test/integration/distributed/inspection/common.go
index c016a4993..3307b83c7 100644
--- a/test/integration/distributed/inspect/common.go
+++ b/test/integration/distributed/inspection/common.go
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package inspect provides shared test setup for the inspect functionality in 
distributed mode.
-package inspect
+// Package inspection provides shared test setup for the inspect functionality 
in distributed mode.
+package inspection
 
 import (
        "context"
diff --git a/test/integration/distributed/inspect/etcd/suite_test.go 
b/test/integration/distributed/inspection/etcd/suite_test.go
similarity index 94%
rename from test/integration/distributed/inspect/etcd/suite_test.go
rename to test/integration/distributed/inspection/etcd/suite_test.go
index 1638d69eb..e0791fba9 100644
--- a/test/integration/distributed/inspect/etcd/suite_test.go
+++ b/test/integration/distributed/inspection/etcd/suite_test.go
@@ -24,11 +24,11 @@ import (
        . "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
-       
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspect"
+       
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspection"
 )
 
 func init() {
-       inspect.SetupFunc = func() inspect.SetupResult {
+       inspection.SetupFunc = func() inspection.SetupResult {
                By("Starting etcd server")
                ep, _, etcdCleanup := setup.StartEmbeddedEtcd()
                config := setup.EtcdClusterConfig(ep)
@@ -40,7 +40,7 @@ func init() {
                By("Starting liaison node")
                liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config)
 
-               return inspect.SetupResult{
+               return inspection.SetupResult{
                        LiaisonAddr:  liaisonAddr,
                        EtcdEndpoint: ep,
                        StopFunc: func() {
diff --git a/test/integration/distributed/inspect/property/suite_test.go 
b/test/integration/distributed/inspection/property/suite_test.go
similarity index 94%
rename from test/integration/distributed/inspect/property/suite_test.go
rename to test/integration/distributed/inspection/property/suite_test.go
index 161d60a76..a0b9ea1b1 100644
--- a/test/integration/distributed/inspect/property/suite_test.go
+++ b/test/integration/distributed/inspection/property/suite_test.go
@@ -25,11 +25,11 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
-       
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspect"
+       
"github.com/apache/skywalking-banyandb/test/integration/distributed/inspection"
 )
 
 func init() {
-       inspect.SetupFunc = func() inspect.SetupResult {
+       inspection.SetupFunc = func() inspection.SetupResult {
                tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
                Expect(tmpErr).NotTo(HaveOccurred())
                dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
@@ -41,7 +41,7 @@ func init() {
                By("Starting liaison node")
                liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config)
 
-               return inspect.SetupResult{
+               return inspection.SetupResult{
                        LiaisonAddr: liaisonAddr,
                        StopFunc: func() {
                                closerLiaisonNode()
diff --git a/test/integration/standalone/deletion/deletion_suite_test.go 
b/test/integration/standalone/deletion/deletion_suite_test.go
new file mode 100644
index 000000000..4003682a1
--- /dev/null
+++ b/test/integration/standalone/deletion/deletion_suite_test.go
@@ -0,0 +1,225 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_deletion_test provides integration tests for group 
deletion in standalone mode.
+package integration_deletion_test
+
+import (
+       "context"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/status"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestDeletion(t *testing.T) {
+       gm.RegisterFailHandler(g.Fail)
+       g.RunSpecs(t, "Integration Deletion Suite", 
g.Label(integration_standalone.Labels...))
+}
+
+var _ = g.BeforeSuite(func() {
+       gm.Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(gm.Succeed())
+})
+
+var _ = g.Describe("GroupDeletion", func() {
+       var goods []gleak.Goroutine
+       var conn *grpclib.ClientConn
+       var deferFn func()
+       var dataPath string
+       var groupClient databasev1.GroupRegistryServiceClient
+
+       g.BeforeEach(func() {
+               goods = gleak.Goroutines()
+               var pathDeferFn func()
+               var spaceErr error
+               dataPath, pathDeferFn, spaceErr = test.NewSpace()
+               gm.Expect(spaceErr).NotTo(gm.HaveOccurred())
+               ports, portsErr := test.AllocateFreePorts(4)
+               gm.Expect(portsErr).NotTo(gm.HaveOccurred())
+               var addr string
+               var serverCloseFn func()
+               addr, _, serverCloseFn = setup.ClosableStandalone(nil, 
dataPath, ports)
+               var connErr error
+               conn, connErr = grpchelper.Conn(addr, 10*time.Second, 
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+               gm.Expect(connErr).NotTo(gm.HaveOccurred())
+               groupClient = databasev1.NewGroupRegistryServiceClient(conn)
+               deferFn = func() {
+                       serverCloseFn()
+                       pathDeferFn()
+               }
+       })
+
+       g.AfterEach(func() {
+               _ = conn.Close()
+               deferFn()
+               gm.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+
+       g.It("returns NotFound when deleting a nonexistent group", func() {
+               _, err := groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                       Group: "nonexistent-group-xyz",
+               })
+               gm.Expect(err).Should(gm.HaveOccurred())
+               errStatus, ok := status.FromError(err)
+               gm.Expect(ok).To(gm.BeTrue())
+               gm.Expect(errStatus.Code()).To(gm.Equal(codes.NotFound))
+       })
+
+       g.It("can delete an empty group without force flag", func() {
+               const newGroup = "empty-test-group"
+               streamDir := filepath.Join(dataPath, "stream", "data", newGroup)
+               g.By("Creating a new empty group")
+               _, err := groupClient.Create(context.TODO(), 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: newGroup},
+                               Catalog:  commonv1.Catalog_CATALOG_STREAM,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 1,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Verifying the group exists")
+               getResp, err := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+               
gm.Expect(getResp.GetGroup().GetMetadata().GetName()).To(gm.Equal(newGroup))
+
+               g.By("Verifying group data directory exists on disk")
+               gm.Eventually(func() bool {
+                       _, statErr := os.Stat(streamDir)
+                       return statErr == nil
+               }, flags.EventuallyTimeout).Should(gm.BeTrue())
+
+               g.By("Deleting the empty group without force")
+               gm.Eventually(func() error {
+                       _, deleteErr := groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                               Group: newGroup,
+                       })
+                       return deleteErr
+               }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+               g.By("Verifying the group is eventually removed")
+               gm.Eventually(func() codes.Code {
+                       _, getErr := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: newGroup})
+                       if getErr == nil {
+                               return codes.OK
+                       }
+                       st, _ := status.FromError(getErr)
+                       return st.Code()
+               }, flags.EventuallyTimeout).Should(gm.Equal(codes.NotFound))
+
+               g.By("Verifying group data directory is removed from disk")
+               gm.Eventually(func() bool {
+                       _, statErr := os.Stat(streamDir)
+                       return os.IsNotExist(statErr)
+               }, flags.EventuallyTimeout).Should(gm.BeTrue())
+       })
+
+       g.It("can delete an existing group with force=true", func() {
+               const groupName = "sw_metric"
+               measureDir := filepath.Join(dataPath, "measure", "data", 
groupName)
+               g.By("Verifying the group exists with resources")
+               _, err := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: groupName})
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Verifying group data directory exists on disk")
+               gm.Eventually(func() bool {
+                       _, statErr := os.Stat(measureDir)
+                       return statErr == nil
+               }, flags.EventuallyTimeout).Should(gm.BeTrue())
+
+               g.By("Deleting group with force=true")
+               _, err = groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                       Group: groupName,
+                       Force: true,
+               })
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Verifying the group is eventually removed")
+               gm.Eventually(func() codes.Code {
+                       _, getErr := groupClient.Get(context.TODO(), 
&databasev1.GroupRegistryServiceGetRequest{Group: groupName})
+                       if getErr == nil {
+                               return codes.OK
+                       }
+                       st, _ := status.FromError(getErr)
+                       return st.Code()
+               }, flags.EventuallyTimeout).Should(gm.Equal(codes.NotFound))
+
+               g.By("Verifying group data directory is removed from disk")
+               gm.Eventually(func() bool {
+                       _, statErr := os.Stat(measureDir)
+                       return os.IsNotExist(statErr)
+               }, flags.EventuallyTimeout).Should(gm.BeTrue())
+       })
+
+       g.It("can query deletion task status until completed", func() {
+               const groupName = "sw_metric"
+               measureDir := filepath.Join(dataPath, "measure", "data", 
groupName)
+               g.By("Verifying group data directory exists on disk before 
deletion")
+               gm.Eventually(func() bool {
+                       _, statErr := os.Stat(measureDir)
+                       return statErr == nil
+               }, flags.EventuallyTimeout).Should(gm.BeTrue())
+
+               g.By("Initiating group deletion with force=true")
+               _, err := groupClient.Delete(context.TODO(), 
&databasev1.GroupRegistryServiceDeleteRequest{
+                       Group: groupName,
+                       Force: true,
+               })
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Waiting for deletion task to reach COMPLETED")
+               gm.Eventually(func() databasev1.GroupDeletionTask_Phase {
+                       queryResp, queryErr := 
groupClient.Query(context.TODO(), &databasev1.GroupRegistryServiceQueryRequest{
+                               Group: groupName,
+                       })
+                       if queryErr != nil {
+                               return 
databasev1.GroupDeletionTask_PHASE_UNSPECIFIED
+                       }
+                       return queryResp.GetTask().GetCurrentPhase()
+               }, flags.EventuallyTimeout, 
100*time.Millisecond).Should(gm.Equal(databasev1.GroupDeletionTask_PHASE_COMPLETED))
+       })
+})
diff --git a/test/integration/standalone/inspect/common.go 
b/test/integration/standalone/inspection/common.go
similarity index 99%
rename from test/integration/standalone/inspect/common.go
rename to test/integration/standalone/inspection/common.go
index f4e138e36..6d40c68f1 100644
--- a/test/integration/standalone/inspect/common.go
+++ b/test/integration/standalone/inspection/common.go
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package inspect provides integration tests for the inspect functionality in 
standalone mode.
-package inspect
+// Package inspection provides integration tests for the inspect functionality 
in standalone mode.
+package inspection
 
 import (
        "context"
diff --git a/test/integration/standalone/inspect/etcd/suite_test.go 
b/test/integration/standalone/inspection/etcd/suite_test.go
similarity index 93%
rename from test/integration/standalone/inspect/etcd/suite_test.go
rename to test/integration/standalone/inspection/etcd/suite_test.go
index f34bd4fa6..77d37b7c8 100644
--- a/test/integration/standalone/inspect/etcd/suite_test.go
+++ b/test/integration/standalone/inspection/etcd/suite_test.go
@@ -25,14 +25,14 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
-       
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspect"
+       
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspection"
 )
 
 func init() {
-       inspect.SetupFunc = func() inspect.SetupResult {
+       inspection.SetupFunc = func() inspection.SetupResult {
                By("Starting standalone server")
                addr, _, closeFn := setup.EmptyStandalone(nil)
-               return inspect.SetupResult{
+               return inspection.SetupResult{
                        Addr:     addr,
                        StopFunc: closeFn,
                }
diff --git a/test/integration/standalone/inspect/property/suite_test.go 
b/test/integration/standalone/inspection/property/suite_test.go
similarity index 94%
rename from test/integration/standalone/inspect/property/suite_test.go
rename to test/integration/standalone/inspection/property/suite_test.go
index 6edcacc54..f214bcbad 100644
--- a/test/integration/standalone/inspect/property/suite_test.go
+++ b/test/integration/standalone/inspection/property/suite_test.go
@@ -26,18 +26,18 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
-       
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspect"
+       
"github.com/apache/skywalking-banyandb/test/integration/standalone/inspection"
 )
 
 func init() {
-       inspect.SetupFunc = func() inspect.SetupResult {
+       inspection.SetupFunc = func() inspection.SetupResult {
                By("Starting standalone server with property mode")
                tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
                Expect(tmpErr).NotTo(HaveOccurred())
                dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
                config := setup.PropertyClusterConfig(dfWriter)
                addr, _, closeFn := setup.EmptyStandalone(config)
-               return inspect.SetupResult{
+               return inspection.SetupResult{
                        Addr: addr,
                        StopFunc: func() {
                                closeFn()

Reply via email to