Copilot commented on code in PR #64902:
URL: https://github.com/apache/airflow/pull/64902#discussion_r3066503249
##########
dev/breeze/src/airflow_breeze/utils/kubernetes_utils.py:
##########
@@ -386,29 +394,42 @@ def set_random_cluster_ports(python: str,
kubernetes_version: str, output: Outpu
"""
forwarded_port_number = _get_free_port()
k8s_api_server_port = _get_free_port()
+ jaeger_port_number = _get_free_port()
+ prometheus_port_number = _get_free_port()
+ grafana_port_number = _get_free_port()
get_console(output=output).print(
- f"[info]Random ports: K8S API: {k8s_api_server_port}, API Server:
{forwarded_port_number}"
+ f"[info]Random ports: K8S API: {k8s_api_server_port}, API Server:
{forwarded_port_number}, "
+ f"Jaeger: {jaeger_port_number}, Prometheus: {prometheus_port_number},
Grafana: {grafana_port_number}"
)
cluster_conf_path = get_kind_cluster_config_path(python=python,
kubernetes_version=kubernetes_version)
config = (
(AIRFLOW_ROOT_PATH / "scripts" / "ci" / "kubernetes" /
"kind-cluster-conf.yaml")
.read_text()
.replace("{{FORWARDED_PORT_NUMBER}}", str(forwarded_port_number))
.replace("{{API_SERVER_PORT}}", str(k8s_api_server_port))
+ .replace("{{JAEGER_PORT_NUMBER}}", str(jaeger_port_number))
+ .replace("{{PROMETHEUS_PORT_NUMBER}}", str(prometheus_port_number))
+ .replace("{{GRAFANA_PORT_NUMBER}}", str(grafana_port_number))
)
cluster_conf_path.write_text(config)
get_console(output=output).print(f"[info]Config created in
{cluster_conf_path}:\n")
get_console(output=output).print(config)
get_console(output=output).print("\n")
-def get_kubernetes_port_numbers(python: str, kubernetes_version: str) ->
tuple[int, int]:
+def get_kubernetes_port_numbers(python: str, kubernetes_version: str) ->
tuple[int, int, int, int, int]:
+ """Returns (k8s_api_server_port, api_server_port, jaeger_port,
prometheus_port, grafana_port)."""
conf = _get_kind_cluster_config_content(python=python,
kubernetes_version=kubernetes_version)
if not conf:
- return 0, 0
+ return 0, 0, 0, 0, 0
k8s_api_server_port = conf["networking"]["apiServerPort"]
- api_server_port = conf["nodes"][1]["extraPortMappings"][0]["hostPort"]
- return k8s_api_server_port, api_server_port
+ mappings = conf["nodes"][1]["extraPortMappings"]
+ # Order matches kind-cluster-conf.yaml: api-server(0), jaeger(1),
prometheus(2), grafana(3)
+ api_server_port = mappings[0]["hostPort"]
+ jaeger_port = mappings[1]["hostPort"] if len(mappings) > 1 else 0
+ prometheus_port = mappings[2]["hostPort"] if len(mappings) > 2 else 0
+ grafana_port = mappings[3]["hostPort"] if len(mappings) > 3 else 0
Review Comment:
Reading ports by hard-coded indices is brittle (it assumes a specific node
index and mapping order). A more robust approach is to locate mappings by
`containerPort` (e.g., 30007/30008/30009/30010) and to select the node that
actually has `extraPortMappings`, so changes to the kind config ordering don’t
silently swap ports.
```suggestion
nodes = conf.get("nodes", [])
mappings = next((node.get("extraPortMappings", []) for node in nodes if
node.get("extraPortMappings")), [])
mapping_by_container_port = {
mapping["containerPort"]: mapping["hostPort"]
for mapping in mappings
if "containerPort" in mapping and "hostPort" in mapping
}
api_server_port = mapping_by_container_port.get(30007, 0)
jaeger_port = mapping_by_container_port.get(30008, 0)
prometheus_port = mapping_by_container_port.get(30009, 0)
grafana_port = mapping_by_container_port.get(30010, 0)
```
##########
chart/templates/_helpers.yaml:
##########
@@ -140,6 +140,24 @@ If release name contains chart name it will be used as a
full name.
name: {{ template "opensearch_secret" . }}
key: connection
{{- end }}
+ {{- if or .Values.otelCollector.tracesEnabled
.Values.otelCollector.metricsEnabled }}
+ - name: OTEL_SERVICE_NAME
+ value: "airflow"
+ - name: OTEL_EXPORTER_OTLP_PROTOCOL
+ value: "http/protobuf"
+ {{- end }}
+ {{- if .Values.otelCollector.tracesEnabled }}
+ - name: OTEL_TRACES_EXPORTER
+ value: "otlp_proto_http"
+ - name: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
+ value: "http://{{ include "airflow.fullname" . }}-otel-collector:{{
.Values.ports.otelCollectorOtlpHttp }}/v1/traces"
+ {{- end }}
+ {{- if .Values.otelCollector.metricsEnabled }}
+ - name: OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
+ value: "http://{{ include "airflow.fullname" . }}-otel-collector:{{
.Values.ports.otelCollectorOtlpHttp }}/v1/metrics"
+ - name: OTEL_METRIC_EXPORT_INTERVAL
+ value: "30000"
+ {{- end }}
Review Comment:
`OTEL_TRACES_EXPORTER` is set to `otlp_proto_http`, which is not a valid
OpenTelemetry SDK exporter value in common OTel env var conventions (typically
`otlp`, `console`, `none`). Use a supported exporter value (and rely on
`OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf` to select HTTP), and also consider
gating these env vars on `.Values.otelCollector.enabled` to avoid emitting
endpoints when the collector is not deployed.
##########
chart/templates/configmaps/otel-collector-configmap.yaml:
##########
@@ -0,0 +1,75 @@
+{{/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/}}
+
+################################
+## OTel Collector ConfigMap
+################################
+{{- if .Values.otelCollector.enabled }}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: {{ include "airflow.fullname" . }}-otel-collector
+ labels:
+ tier: airflow
+ component: config
+ release: {{ .Release.Name }}
+ chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
+ heritage: {{ .Release.Service }}
+ {{- with .Values.labels }}
+ {{- toYaml . | nindent 4 }}
+ {{- end }}
+data:
+ config.yml: |
+ extensions:
+ health_check:
+ endpoint: 0.0.0.0:13133
+
+ receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:{{ .Values.ports.otelCollectorOtlpGrpc }}
+ http:
+ endpoint: 0.0.0.0:{{ .Values.ports.otelCollectorOtlpHttp }}
+
+ processors:
+ batch: {}
+
+ exporters:
+ otlp/jaeger:
+ endpoint: "{{ include "airflow.fullname" . }}-jaeger:{{
.Values.ports.jaegerOtlpGrpc }}"
+ tls:
+ insecure: true
+ logging:
+ verbosity: basic
+ prometheus:
+ endpoint: "0.0.0.0:{{ .Values.ports.otelCollectorScrape }}"
+
+ service:
+ extensions: [health_check]
+ pipelines:
+ traces:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [logging, otlp/jaeger]
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [logging, prometheus]
Review Comment:
The collector config always enables both `traces` and `metrics` pipelines
and always configures the Jaeger exporter endpoint, even if the user only
enables metrics (or doesn’t deploy Jaeger at all). This can cause persistent
export failures/retries and noisy logs. Make exporters/pipelines conditional on
`.Values.otelCollector.tracesEnabled` / `.Values.otelCollector.metricsEnabled`,
and ensure Helm fails clearly (or generates a minimal valid config) when
`otelCollector.enabled=true` but both pipelines would be disabled.
##########
dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:
##########
@@ -1349,6 +1368,23 @@ def _deploy_airflow(
check=False,
)
if result.returncode == 0:
+ otel_flags = _otel_flags_from_options(extra_options)
+ observability_files: list[str] = []
+ if "tracesEnabled" in otel_flags:
+ observability_files.append(str(SCRIPTS_CI_KUBERNETES_PATH /
"observability" / "jaeger.yaml"))
+ if "metricsEnabled" in otel_flags:
+ observability_files.append(str(SCRIPTS_CI_KUBERNETES_PATH /
"observability" / "prometheus.yaml"))
+ observability_files.append(str(SCRIPTS_CI_KUBERNETES_PATH /
"observability" / "grafana.yaml"))
+ if observability_files:
+ get_console(output=output).print("[info]Deploying observability
backends for enabled OTel flags.")
+ run_command_with_k8s_env(
+ ["kubectl", "apply", "--namespace", HELM_AIRFLOW_NAMESPACE]
+ + [arg for f in observability_files for arg in ("-f", f)],
+ python=python,
+ kubernetes_version=kubernetes_version,
+ output=output,
+ check=False,
+ )
Review Comment:
This only ever `apply`s observability resources; if a user previously
enabled metrics/traces and later disables them, the Jaeger/Prometheus/Grafana
resources will remain running (stale backends) because no delete/prune occurs.
Consider deleting the corresponding resources when flags are not enabled, or
apply via a labeled kustomization with pruning semantics so the deployed state
matches the requested flags.
##########
scripts/ci/prek/lint_json_schema.py:
##########
@@ -111,7 +111,7 @@ def load_file(file_path: str):
return json.load(input_file)
elif file_path.lower().endswith((".yaml", ".yml")):
with open(file_path) as input_file:
- return yaml.safe_load(input_file)
+ return list(yaml.safe_load_all(input_file))
Review Comment:
`load_file` now always returns a list for YAML inputs (even single-document
YAML), which is a behavioral/API change vs returning a single object. Update
the `load_file` docstring/type hints to reflect this, or consider preserving
the prior behavior by returning the single document directly when only one YAML
document is present (while still supporting multi-doc YAML when needed).
##########
dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:
##########
@@ -1349,6 +1368,23 @@ def _deploy_airflow(
check=False,
)
if result.returncode == 0:
+ otel_flags = _otel_flags_from_options(extra_options)
+ observability_files: list[str] = []
+ if "tracesEnabled" in otel_flags:
+ observability_files.append(str(SCRIPTS_CI_KUBERNETES_PATH /
"observability" / "jaeger.yaml"))
+ if "metricsEnabled" in otel_flags:
+ observability_files.append(str(SCRIPTS_CI_KUBERNETES_PATH /
"observability" / "prometheus.yaml"))
+ observability_files.append(str(SCRIPTS_CI_KUBERNETES_PATH /
"observability" / "grafana.yaml"))
+ if observability_files:
+ get_console(output=output).print("[info]Deploying observability
backends for enabled OTel flags.")
+ run_command_with_k8s_env(
+ ["kubectl", "apply", "--namespace", HELM_AIRFLOW_NAMESPACE]
+ + [arg for f in observability_files for arg in ("-f", f)],
+ python=python,
+ kubernetes_version=kubernetes_version,
+ output=output,
+ check=False,
+ )
Review Comment:
The PR description/title says Jaeger/Grafana/Prometheus are added as
optional services to the Helm chart, but this code deploys them via `kubectl
apply` of static manifests under `scripts/ci/...` (i.e., Breeze/CI-driven, not
Helm-managed resources). Either update the PR description to reflect that these
backends are provisioned by Breeze for dev/CI, or move them into Helm
templates/values so they are actually part of the chart.
##########
chart/values.yaml:
##########
@@ -3912,10 +3963,15 @@ config:
remote_logging: '{{- ternary "True" "False" (or
.Values.elasticsearch.enabled .Values.opensearch.enabled) }}'
colored_console_log: 'False'
metrics:
- statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
+ statsd_on: '{{ ternary "True" "False" (and .Values.statsd.enabled (not
.Values.otelCollector.metricsEnabled)) }}'
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" (include "airflow.fullname" .) }}'
+ otel_on: '{{ ternary "True" "False" .Values.otelCollector.metricsEnabled
}}'
+ otel_host: '{{ if .Values.otelCollector.metricsEnabled }}{{ printf
"%s-otel-collector" (include "airflow.fullname" .) }}{{ end }}'
+ otel_port: '4318'
+ traces:
+ otel_on: '{{ ternary "True" "False" .Values.otelCollector.tracesEnabled }}'
Review Comment:
`metricsEnabled`/`tracesEnabled` can turn on OTel (and disable statsd) even
when `.Values.otelCollector.enabled` is false, which will point Airflow to a
collector Service/Deployment that won’t exist. Gate `otel_on`/`otel_host` and
the `statsd_on` disabling logic on `(and .Values.otelCollector.enabled
.Values.otelCollector.<flag>)`, or hard-fail chart rendering (or schema
validation) when `<flag>` is true but `enabled` is false.
```suggestion
statsd_on: '{{ ternary "True" "False" (and .Values.statsd.enabled (not
(and .Values.otelCollector.enabled .Values.otelCollector.metricsEnabled))) }}'
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" (include "airflow.fullname" .) }}'
otel_on: '{{ ternary "True" "False" (and .Values.otelCollector.enabled
.Values.otelCollector.metricsEnabled) }}'
otel_host: '{{ if and .Values.otelCollector.enabled
.Values.otelCollector.metricsEnabled }}{{ printf "%s-otel-collector" (include
"airflow.fullname" .) }}{{ end }}'
otel_port: '4318'
traces:
otel_on: '{{ ternary "True" "False" (and .Values.otelCollector.enabled
.Values.otelCollector.tracesEnabled) }}'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]