ashb commented on code in PR #64364:
URL: https://github.com/apache/airflow/pull/64364#discussion_r3078598922


##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -225,23 +310,54 @@ def __init__(
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark.strip()
         self.write_stdout = write_stdout
+        self.write_to_opensearch = write_to_opensearch
         self.json_format = json_format
         self.json_fields = [label.strip() for label in json_fields.split(",")]
         self.host = self.format_url(host)
         self.host_field = host_field
         self.offset_field = offset_field
+        self.target_index = target_index
         self.index_patterns = index_patterns
         self.index_patterns_callable = index_patterns_callable
         self.context_set = False
-        self.client = OpenSearch(
-            hosts=[{"host": host, "port": port}],
-            http_auth=(username, password),
-            **os_kwargs,
+        self.client = _create_opensearch_client(
+            self.host,
+            port,
+            username,
+            password,
+            cast("dict[str, Any]", os_kwargs),
         )
+        self.delete_local_copy = kwargs.get(
+            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
+        )
+        self.log_id_template = log_id_template
         self.formatter: logging.Formatter
-        self.handler: logging.FileHandler | logging.StreamHandler
+        self.handler: logging.FileHandler | logging.StreamHandler | None = None
         self._doc_type_map: dict[Any, Any] = {}
         self._doc_type: list[Any] = []
+        self.io = OpensearchRemoteLogIO(
+            host=self.host,
+            port=port,
+            username=username,
+            password=password,
+            write_to_opensearch=self.write_to_opensearch,
+            target_index=self.target_index,
+            write_stdout=self.write_stdout,
+            offset_field=self.offset_field,
+            host_field=self.host_field,
+            base_log_folder=base_log_folder,
+            delete_local_copy=self.delete_local_copy,
+            json_format=self.json_format,
+            log_id_template=self.log_id_template,
+        )
+        if AIRFLOW_V_3_0_PLUS:
+            if AIRFLOW_V_3_2_PLUS:
+                from airflow.logging_config import _ActiveLoggingConfig, 
get_remote_task_log
+
+                if get_remote_task_log() is None:
+                    _ActiveLoggingConfig.set(self.io, None)

Review Comment:
   Doing this inside the handler feels like a massive abstraction lean. The log 
setup code should be responsible for this, not each and every individual 
handler.
   
   Which we already do 
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/log.py#L174-L187
   
   @Owen-CH-Leung Can one of you fix this up in a follow up PR please? It 
should not be needed.
   
   Importing from outside of `airflow.sdk` inside a provider is an 
architectural violation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to