Copilot commented on code in PR #1005:
URL:
https://github.com/apache/skywalking-banyandb/pull/1005#discussion_r2919737654
##########
banyand/liaison/grpc/discovery.go:
##########
@@ -138,13 +141,65 @@ func (i identity) String() string {
var _ schema.EventHandler = (*groupRepo)(nil)
+type groupInflight struct {
+ done chan struct{}
+ wg sync.WaitGroup
+}
+
type groupRepo struct {
schema.UnimplementedOnInitHandler
log *logger.Logger
resourceOpts map[string]*commonv1.ResourceOpts
+ inflight map[string]*groupInflight
sync.RWMutex
}
+func (s *groupRepo) acquireRequest(groupName string) error {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ item, ok := s.inflight[groupName]
+ if ok && item.done != nil {
+ return fmt.Errorf("%s: %w", groupName, errGroupPendingDeletion)
+ }
+ if !ok {
+ item = &groupInflight{}
+ s.inflight[groupName] = item
+ }
+ item.wg.Add(1)
+ return nil
+}
+
+func (s *groupRepo) releaseRequest(groupName string) {
+ s.RWMutex.RLock()
+ item, ok := s.inflight[groupName]
+ s.RWMutex.RUnlock()
+ if ok {
+ item.wg.Done()
+ }
+}
+
+func (s *groupRepo) waitInflightRequests(groupName string) <-chan struct{} {
+ s.RWMutex.Lock()
+ item, ok := s.inflight[groupName]
+ if !ok {
+ item = &groupInflight{}
+ s.inflight[groupName] = item
+ }
+ item.done = make(chan struct{})
+ s.RWMutex.Unlock()
+ go func() {
+ item.wg.Wait()
+ close(item.done)
+ }()
+ return item.done
Review Comment:
waitInflightRequests overwrites item.done and the goroutine closes item.done
by reference. If waitInflightRequests is invoked more than once for the same
group (e.g., a retry/stale-task path), the earlier returned channel may never
be closed and callers can deadlock. Capture the newly-created channel in a
local variable and close that, and/or return the existing done channel if one
is already set instead of overwriting it.
```suggestion
// If a done channel already exists for this group, reuse it.
if item.done != nil {
ch := item.done
s.RWMutex.Unlock()
return ch
}
ch := make(chan struct{})
item.done = ch
s.RWMutex.Unlock()
go func(wg *sync.WaitGroup, done chan struct{}) {
wg.Wait()
close(done)
}(&item.wg, ch)
return ch
```
##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,459 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ internalDeletionTaskGroup = "_deletion_task"
+ internalDeletionTaskPropertyName = "_deletion_task"
+ taskDataTagName = "task_data"
+ taskStaleTimeout = 10 * time.Minute
+)
+
+type propertyApplier interface {
+ Apply(ctx context.Context, req *propertyv1.ApplyRequest)
(*propertyv1.ApplyResponse, error)
+ Query(ctx context.Context, req *propertyv1.QueryRequest)
(*propertyv1.QueryResponse, error)
+}
+
+type groupDeletionTaskManager struct {
+ schemaRegistry metadata.Repo
+ propServer propertyApplier
+ log *logger.Logger
+ groupRepo *groupRepo
+ tasks sync.Map
+}
+
+func newGroupDeletionTaskManager(
+ schemaRegistry metadata.Repo, propServer *propertyServer, gr
*groupRepo, l *logger.Logger,
+) *groupDeletionTaskManager {
+ return &groupDeletionTaskManager{
+ schemaRegistry: schemaRegistry,
+ propServer: propServer,
+ groupRepo: gr,
+ log: l,
+ }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context)
error {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: internalDeletionTaskGroup,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ },
+ }
+ _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx,
internalDeletionTaskGroup)
+ if getGroupErr != nil {
+ if createErr :=
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+ return fmt.Errorf("failed to create internal deletion
task group: %w", createErr)
+ }
+ }
+ propSchema := &databasev1.Property{
+ Metadata: &commonv1.Metadata{
+ Group: internalDeletionTaskGroup,
+ Name: internalDeletionTaskPropertyName,
+ },
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: taskDataTagName,
+ Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+ },
+ },
+ }
+ _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx,
propSchema.Metadata)
+ if getPropErr != nil {
+ if createErr :=
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr
!= nil {
+ return fmt.Errorf("failed to create internal deletion
task property schema: %w", createErr)
+ }
+ }
Review Comment:
Similarly, initPropertyStorage creates the internal deletion-task property
schema whenever GetProperty returns any error. This should usually only happen
on NotFound; other errors should be returned so startup doesn't silently
proceed with a broken schema registry.
##########
banyand/property/db/db.go:
##########
@@ -380,6 +382,27 @@ func (db *database) getShard(group string, id
common.ShardID) (*shard, bool) {
return nil, false
}
+// Drop closes and removes all shards for the given group and deletes the
group directory.
+func (db *database) Drop(groupName string) error {
+ value, ok := db.groups.LoadAndDelete(groupName)
+ if !ok {
+ return nil
+ }
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst != nil {
+ var err error
+ for _, s := range *sLst {
+ multierr.AppendInto(&err, s.close())
+ }
+ if err != nil {
+ return err
+ }
+ }
+ db.lfs.MustRMAll(gs.location)
+ return nil
Review Comment:
Drop uses FileSystem.MustRMAll, which panics on repeated remove failures.
Since Drop is invoked as part of group deletion, a filesystem issue could crash
the whole process instead of returning an error and marking the deletion task
failed. Prefer a non-panicking removal API or add a recover wrapper (similar to
tsdb.Drop / wqueue.Drop) and return the removal error.
##########
banyand/metadata/schema/collector.go:
##########
@@ -199,6 +208,72 @@ func (icr *InfoCollectorRegistry)
collectLiaisonInfoLocal(ctx context.Context, c
return nil, nil
}
+// DropGroup drops the group data files on all nodes.
+func (icr *InfoCollectorRegistry) DropGroup(ctx context.Context, catalog
commonv1.Catalog, group string) error {
+ if localErr := icr.dropGroupLocal(ctx, catalog, group); localErr != nil
{
+ return fmt.Errorf("failed to drop group locally: %w", localErr)
+ }
Review Comment:
DropGroup's comment says it drops group data files on all nodes, but the
implementation returns immediately on a local drop error and never attempts
remote drops. Consider broadcasting regardless and returning a combined error
(e.g., multierr of local + remote), so one node failure doesn't prevent cleanup
attempts elsewhere.
##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,459 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ internalDeletionTaskGroup = "_deletion_task"
+ internalDeletionTaskPropertyName = "_deletion_task"
+ taskDataTagName = "task_data"
+ taskStaleTimeout = 10 * time.Minute
+)
+
+type propertyApplier interface {
+ Apply(ctx context.Context, req *propertyv1.ApplyRequest)
(*propertyv1.ApplyResponse, error)
+ Query(ctx context.Context, req *propertyv1.QueryRequest)
(*propertyv1.QueryResponse, error)
+}
+
+type groupDeletionTaskManager struct {
+ schemaRegistry metadata.Repo
+ propServer propertyApplier
+ log *logger.Logger
+ groupRepo *groupRepo
+ tasks sync.Map
+}
+
+func newGroupDeletionTaskManager(
+ schemaRegistry metadata.Repo, propServer *propertyServer, gr
*groupRepo, l *logger.Logger,
+) *groupDeletionTaskManager {
+ return &groupDeletionTaskManager{
+ schemaRegistry: schemaRegistry,
+ propServer: propServer,
+ groupRepo: gr,
+ log: l,
+ }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context)
error {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: internalDeletionTaskGroup,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ },
+ }
+ _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx,
internalDeletionTaskGroup)
+ if getGroupErr != nil {
+ if createErr :=
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+ return fmt.Errorf("failed to create internal deletion
task group: %w", createErr)
+ }
+ }
Review Comment:
initPropertyStorage creates the internal deletion-task group whenever
GetGroup returns any error. This can mask real failures (e.g., transient
store/permission errors) and may attempt to CreateGroup in an unhealthy state.
Consider only creating the group when errors.Is(getGroupErr,
schema.ErrGRPCResourceNotFound), and return/propagate other errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]