Copilot commented on code in PR #1005:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1005#discussion_r2919737654


##########
banyand/liaison/grpc/discovery.go:
##########
@@ -138,13 +141,65 @@ func (i identity) String() string {
 
 var _ schema.EventHandler = (*groupRepo)(nil)
 
+type groupInflight struct {
+       done chan struct{}
+       wg   sync.WaitGroup
+}
+
 type groupRepo struct {
        schema.UnimplementedOnInitHandler
        log          *logger.Logger
        resourceOpts map[string]*commonv1.ResourceOpts
+       inflight     map[string]*groupInflight
        sync.RWMutex
 }
 
+func (s *groupRepo) acquireRequest(groupName string) error {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       item, ok := s.inflight[groupName]
+       if ok && item.done != nil {
+               return fmt.Errorf("%s: %w", groupName, errGroupPendingDeletion)
+       }
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.wg.Add(1)
+       return nil
+}
+
+func (s *groupRepo) releaseRequest(groupName string) {
+       s.RWMutex.RLock()
+       item, ok := s.inflight[groupName]
+       s.RWMutex.RUnlock()
+       if ok {
+               item.wg.Done()
+       }
+}
+
+func (s *groupRepo) waitInflightRequests(groupName string) <-chan struct{} {
+       s.RWMutex.Lock()
+       item, ok := s.inflight[groupName]
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.done = make(chan struct{})
+       s.RWMutex.Unlock()
+       go func() {
+               item.wg.Wait()
+               close(item.done)
+       }()
+       return item.done

Review Comment:
   waitInflightRequests overwrites item.done and the goroutine closes item.done 
by reference. If waitInflightRequests is invoked more than once for the same 
group (e.g., a retry/stale-task path), the earlier returned channel may never 
be closed and callers can deadlock. Capture the newly-created channel in a 
local variable and close that, and/or return the existing done channel if one 
is already set instead of overwriting it.
   ```suggestion
        // If a done channel already exists for this group, reuse it.
        if item.done != nil {
                ch := item.done
                s.RWMutex.Unlock()
                return ch
        }
        ch := make(chan struct{})
        item.done = ch
        s.RWMutex.Unlock()
        go func(wg *sync.WaitGroup, done chan struct{}) {
                wg.Wait()
                close(done)
        }(&item.wg, ch)
        return ch
   ```



##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,459 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "_deletion_task"
+       taskDataTagName                  = "task_data"
+       taskStaleTimeout                 = 10 * time.Minute
+)
+
+type propertyApplier interface {
+       Apply(ctx context.Context, req *propertyv1.ApplyRequest) 
(*propertyv1.ApplyResponse, error)
+       Query(ctx context.Context, req *propertyv1.QueryRequest) 
(*propertyv1.QueryResponse, error)
+}
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     propertyApplier
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(
+       schemaRegistry metadata.Repo, propServer *propertyServer, gr 
*groupRepo, l *logger.Logger,
+) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }

Review Comment:
   Similarly, initPropertyStorage creates the internal deletion-task property 
schema whenever GetProperty returns any error. This should usually only happen 
on NotFound; other errors should be returned so startup doesn't silently 
proceed with a broken schema registry.



##########
banyand/property/db/db.go:
##########
@@ -380,6 +382,27 @@ func (db *database) getShard(group string, id 
common.ShardID) (*shard, bool) {
        return nil, false
 }
 
+// Drop closes and removes all shards for the given group and deletes the 
group directory.
+func (db *database) Drop(groupName string) error {
+       value, ok := db.groups.LoadAndDelete(groupName)
+       if !ok {
+               return nil
+       }
+       gs := value.(*groupShards)
+       sLst := gs.shards.Load()
+       if sLst != nil {
+               var err error
+               for _, s := range *sLst {
+                       multierr.AppendInto(&err, s.close())
+               }
+               if err != nil {
+                       return err
+               }
+       }
+       db.lfs.MustRMAll(gs.location)
+       return nil

Review Comment:
   Drop uses FileSystem.MustRMAll, which panics on repeated remove failures. 
Since Drop is invoked as part of group deletion, a filesystem issue could crash 
the whole process instead of returning an error and marking the deletion task 
failed. Prefer a non-panicking removal API or add a recover wrapper (similar to 
tsdb.Drop / wqueue.Drop) and return the removal error.



##########
banyand/metadata/schema/collector.go:
##########
@@ -199,6 +208,72 @@ func (icr *InfoCollectorRegistry) 
collectLiaisonInfoLocal(ctx context.Context, c
        return nil, nil
 }
 
+// DropGroup drops the group data files on all nodes.
+func (icr *InfoCollectorRegistry) DropGroup(ctx context.Context, catalog 
commonv1.Catalog, group string) error {
+       if localErr := icr.dropGroupLocal(ctx, catalog, group); localErr != nil 
{
+               return fmt.Errorf("failed to drop group locally: %w", localErr)
+       }

Review Comment:
   DropGroup's comment says it drops group data files on all nodes, but the 
implementation returns immediately on a local drop error and never attempts 
remote drops. Consider broadcasting regardless and returning a combined error 
(e.g., multierr of local + remote), so one node failure doesn't prevent cleanup 
attempts elsewhere.



##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,459 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "_deletion_task"
+       taskDataTagName                  = "task_data"
+       taskStaleTimeout                 = 10 * time.Minute
+)
+
+type propertyApplier interface {
+       Apply(ctx context.Context, req *propertyv1.ApplyRequest) 
(*propertyv1.ApplyResponse, error)
+       Query(ctx context.Context, req *propertyv1.QueryRequest) 
(*propertyv1.QueryResponse, error)
+}
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     propertyApplier
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(
+       schemaRegistry metadata.Repo, propServer *propertyServer, gr 
*groupRepo, l *logger.Logger,
+) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }

Review Comment:
   initPropertyStorage creates the internal deletion-task group whenever 
GetGroup returns any error. This can mask real failures (e.g., transient 
store/permission errors) and may attempt to CreateGroup in an unhealthy state. 
Consider only creating the group when errors.Is(getGroupErr, 
schema.ErrGRPCResourceNotFound), and return/propagate other errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to