Copilot commented on code in PR #1047: URL: https://github.com/apache/skywalking-banyandb/pull/1047#discussion_r3057522471
########## test/integration/replication/benchmark/helm.go: ########## @@ -0,0 +1,71 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package benchmark + +import ( + "context" + "path/filepath" + "strings" +) + +const ( + benchmarkRelease = "banyandb-bench" + localImage = "apache/skywalking-banyandb:latest" + localSlimImage = "apache/skywalking-banyandb:latest-slim" +) + +func buildLocalImage(ctx context.Context, repoRoot string) error { + env := map[string]string{ + "RELEASE_VERSION": "local", + } + if _, err := runCommandEnv(ctx, env, "make", "-C", filepath.Join(repoRoot, "ui"), "build"); err != nil { + return err + } + if _, err := runCommandEnv(ctx, env, "make", "-C", filepath.Join(repoRoot, "banyand"), "release"); err != nil { + return err + } + if _, err := runCommandEnv(ctx, env, "make", "-C", filepath.Join(repoRoot, "banyand"), "docker"); err != nil { + return err + } Review Comment: `buildLocalImage` runs `make -C ui build`, which introduces an extra Node/npm prerequisite and adds significant runtime, but the BanyanDB Docker image build (see `banyand/Dockerfile`) does not copy UI artifacts. Unless there is another dependency not shown here, consider dropping the UI build step (or gating it behind an explicit flag) to keep the benchmark runner lightweight and reduce failure modes. ########## test/integration/replication/benchmark/metrics.go: ########## @@ -0,0 +1,288 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package benchmark + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "sort" + "time" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" +) + +// PromMetrics contains raw counters and gauges scraped from /metrics endpoints. +type PromMetrics struct { + CPUTotalSeconds float64 + RSSBytes float64 + CPUNum float64 + TotalMemBytes float64 +} + +// AggregatedSample represents one aggregated metrics scrape point. +type AggregatedSample struct { + Timestamp time.Time + CPUSeconds float64 + RSSBytes float64 + CPUNum float64 + TotalMemory float64 +} + +// AggregatedSeries stores samples collected over a benchmark phase. +type AggregatedSeries struct { + Samples []AggregatedSample +} + +// ResourceStatsSeries summarizes CPU and memory usage over a phase. +type ResourceStatsSeries struct { + MeanCPUPercent float64 + PeakCPUPercent float64 + PeakRSSBytes int64 + PeakRSSPercent float64 +} + +func parsePromMetrics(r io.Reader) (PromMetrics, error) { + parser := expfmt.NewTextParser(model.UTF8Validation) + families, err := parser.TextToMetricFamilies(r) + if err != nil { + return PromMetrics{}, err + } + metrics := PromMetrics{} + if mf, ok := families["process_cpu_seconds_total"]; ok { + for _, m := range mf.Metric { + if m.Counter != nil { + metrics.CPUTotalSeconds += m.Counter.GetValue() + } else if m.Untyped != nil { + metrics.CPUTotalSeconds += m.Untyped.GetValue() + } + } + } + if mf, ok := families["process_resident_memory_bytes"]; ok { + for _, m := range mf.Metric { + if m.Gauge != nil { + metrics.RSSBytes += m.Gauge.GetValue() + } else if m.Untyped != nil { + metrics.RSSBytes += m.Untyped.GetValue() + } + } + } + if mf, ok := families["banyandb_system_cpu_num"]; ok { + for _, m := range mf.Metric { + if m.Gauge != nil { + metrics.CPUNum = m.Gauge.GetValue() + break + } + if m.Untyped != nil { + metrics.CPUNum = m.Untyped.GetValue() + break + } + } + } + if mf, ok := families["banyandb_system_memory_state"]; ok { + for _, m := range mf.Metric { + val := 0.0 + if m.Gauge != nil { + val = m.Gauge.GetValue() + } else if m.Untyped != nil { + val = m.Untyped.GetValue() + } + if val == 0 { + continue + } + for _, label := range m.Label { + if label.GetName() == "kind" && label.GetValue() == "total" { + metrics.TotalMemBytes += val + } + } + } + } + return metrics, nil +} + +func scrapeMetrics(ctx context.Context, url string) (PromMetrics, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return PromMetrics{}, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return PromMetrics{}, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return PromMetrics{}, fmt.Errorf("metrics endpoint %s returned %s", url, resp.Status) + } + buf := new(bytes.Buffer) + _, err = io.Copy(buf, resp.Body) + if err != nil { + return PromMetrics{}, err + } + return parsePromMetrics(bytes.NewReader(buf.Bytes())) +} Review Comment: `scrapeMetrics` copies the entire `/metrics` response body into a `bytes.Buffer` before parsing. This can be unnecessarily memory-heavy because Prometheus endpoints can be large; `parsePromMetrics` can read directly from `resp.Body`. Consider parsing the response stream directly (optionally wrapping with a size limit) to reduce allocations and peak memory during collection. ########## test/integration/replication/benchmark/runner.go: ########## @@ -0,0 +1,228 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package benchmark + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func runBenchmarkRF(ctx context.Context, repoRoot string, cfg Config, rf int) (RFResult, error) { + result := RFResult{ReplicationFactor: rf} + if err := cfg.Validate(); err != nil { + return result, err + } + namespace := fmt.Sprintf("banyandb-bench-rf-%d-%d", rf, time.Now().UnixNano()) + + if err := installChart(ctx, repoRoot, namespace, cfg); err != nil { + return result, err + } + defer func() { + _ = uninstallChart(ctx, namespace) + _ = deleteNamespace(ctx, namespace) + }() + + if _, err := runCommand(ctx, "kubectl", "-n", namespace, "wait", "--for=condition=ready", "pod", "--all", "--timeout=600s"); err != nil { + return result, err + } + + pods, err := fetchPods(ctx, namespace) + if err != nil { + return result, err + } + services, err := fetchServices(ctx, namespace) + if err != nil { + return result, err + } + dataPods := discoverDataPods(pods) + liaisonPods := discoverLiaisonPods(pods) + if len(dataPods) == 0 { + return result, fmt.Errorf("no data pods discovered in namespace %s", namespace) + } + if len(liaisonPods) == 0 { + return result, fmt.Errorf("no liaison pods discovered in namespace %s", namespace) + } + grpcService, err := discoverGRPCService(services) + if err != nil { + return result, err + } + + grpcPorts, err := test.AllocateFreePorts(1) + if err != nil { + return result, err + } + grpcPF, err := startPortForward(ctx, namespace, "svc/"+grpcService.Metadata.Name, grpcPorts[0], 17912) Review Comment: `startPortForward` is hard-coding the remote gRPC port as `17912` even though this package defines `grpcPort` in kube.go. Using the shared constant here would avoid the port values drifting if the chart/service port changes. ```suggestion grpcPF, err := startPortForward(ctx, namespace, "svc/"+grpcService.Metadata.Name, grpcPorts[0], grpcPort) ``` ########## test/integration/replication/benchmark/runner.go: ########## @@ -0,0 +1,228 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package benchmark + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func runBenchmarkRF(ctx context.Context, repoRoot string, cfg Config, rf int) (RFResult, error) { + result := RFResult{ReplicationFactor: rf} + if err := cfg.Validate(); err != nil { + return result, err + } + namespace := fmt.Sprintf("banyandb-bench-rf-%d-%d", rf, time.Now().UnixNano()) + + if err := installChart(ctx, repoRoot, namespace, cfg); err != nil { + return result, err + } + defer func() { + _ = uninstallChart(ctx, namespace) + _ = deleteNamespace(ctx, namespace) Review Comment: Deferred cleanup uses the benchmark context (`ctx`) for `uninstallChart` / `deleteNamespace`. If the RF run hits the 45m timeout or is canceled, `ctx` will already be canceled and the cleanup commands will likely fail immediately, leaving the Helm release/namespace behind. Use a non-cancelable cleanup context (e.g., `context.WithoutCancel(ctx)` with its own short timeout) inside the deferred function so cleanup still runs after cancellation/timeouts. ```suggestion cleanupCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Minute) defer cancel() _ = uninstallChart(cleanupCtx, namespace) _ = deleteNamespace(cleanupCtx, namespace) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
