ashb commented on code in PR #64364:
URL: https://github.com/apache/airflow/pull/64364#discussion_r3078598922
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -225,23 +310,54 @@ def __init__(
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark.strip()
self.write_stdout = write_stdout
+ self.write_to_opensearch = write_to_opensearch
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.host = self.format_url(host)
self.host_field = host_field
self.offset_field = offset_field
+ self.target_index = target_index
self.index_patterns = index_patterns
self.index_patterns_callable = index_patterns_callable
self.context_set = False
- self.client = OpenSearch(
- hosts=[{"host": host, "port": port}],
- http_auth=(username, password),
- **os_kwargs,
+ self.client = _create_opensearch_client(
+ self.host,
+ port,
+ username,
+ password,
+ cast("dict[str, Any]", os_kwargs),
)
+ self.delete_local_copy = kwargs.get(
+ "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
+ )
+ self.log_id_template = log_id_template
self.formatter: logging.Formatter
- self.handler: logging.FileHandler | logging.StreamHandler
+ self.handler: logging.FileHandler | logging.StreamHandler | None = None
self._doc_type_map: dict[Any, Any] = {}
self._doc_type: list[Any] = []
+ self.io = OpensearchRemoteLogIO(
+ host=self.host,
+ port=port,
+ username=username,
+ password=password,
+ write_to_opensearch=self.write_to_opensearch,
+ target_index=self.target_index,
+ write_stdout=self.write_stdout,
+ offset_field=self.offset_field,
+ host_field=self.host_field,
+ base_log_folder=base_log_folder,
+ delete_local_copy=self.delete_local_copy,
+ json_format=self.json_format,
+ log_id_template=self.log_id_template,
+ )
+ if AIRFLOW_V_3_0_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
+ from airflow.logging_config import _ActiveLoggingConfig,
get_remote_task_log
+
+ if get_remote_task_log() is None:
+ _ActiveLoggingConfig.set(self.io, None)
Review Comment:
Doing this inside the handler feels like a massive abstraction lean. The log
setup code should be responsible for this, not each and every individual
handler.
Which we already do
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/log.py#L174-L187
@Owen-CH-Leung Can one of you fix this up in a follow up PR please? It
should not be needed.
Importing from outside of `airflow.sdk` inside a provider is an
architectural violation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]