jason810496 commented on code in PR #52581:
URL: https://github.com/apache/airflow/pull/52581#discussion_r2175631390


##########
airflow-core/src/airflow/utils/serve_logs.py:
##########
@@ -43,74 +44,55 @@
 logger = logging.getLogger(__name__)
 
 
-def create_app():
-    flask_app = Flask(__name__, static_folder=None)
-    leeway = conf.getint("webserver", "log_request_clock_grace", fallback=30)
-    log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
-    log_config_class = conf.get("logging", "logging_config_class")
-    if log_config_class:
-        logger.info("Detected user-defined logging config. Attempting to load 
%s", log_config_class)
-        try:
-            logging_config = import_string(log_config_class)
-            try:
-                base_log_folder = 
logging_config["handlers"]["task"]["base_log_folder"]
-            except KeyError:
-                base_log_folder = None
-            if base_log_folder is not None:
-                log_directory = base_log_folder
-                logger.info(
-                    "Successfully imported user-defined logging config. Flask 
App will serve log from %s",
-                    log_directory,
-                )
-            else:
-                logger.warning(
-                    "User-defined logging config does not specify 
'base_log_folder'. "
-                    "Flask App will use default log directory %s",
-                    base_log_folder,
-                )
-        except Exception as e:
-            raise ImportError(f"Unable to load {log_config_class} due to 
error: {e}")
-    signer = JWTValidator(
-        issuer=None,
-        secret_key=get_signing_key("api", "secret_key"),
-        algorithm="HS512",
-        leeway=leeway,
-        audience="task-instance-logs",
-    )
+class JWTAuthStaticFiles(StaticFiles):
+    """StaticFiles with JWT authentication."""
 
-    # Prevent direct access to the logs port
-    @flask_app.before_request
-    def validate_pre_signed_url():
+    # reference from 
https://github.com/fastapi/fastapi/issues/858#issuecomment-876564020
+
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __call__(self, scope, receive, send) -> None:
+        request = Request(scope, receive)
+        await self.validate_jwt_token(request)
+        await super().__call__(scope, receive, send)
+
+    async def validate_jwt_token(self, request: Request):

Review Comment:
   I move the `validate_jwt_token` to new `JWTAuthStaticFiles` class.
   
   Also make it as `async` method, since the unit test will raise some event 
loop issue related to `async_to_sync` if I use the original 
`signer.validated_claims(auth)` instead of `await 
signer.avalidated_claims(auth)`.
   



##########
airflow-core/src/airflow/utils/serve_logs.py:
##########
@@ -119,25 +101,64 @@ def validate_pre_signed_url():
                 get_docs_url("configurations-ref.html#secret-key"),
                 exc_info=True,
             )
-            abort(403)
+            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, 
detail="Expired signature")
         except InvalidIssuedAtError:
             logger.warning(
-                "The request was issues in the future. Make sure that all 
components "
+                "The request was issued in the future. Make sure that all 
components "
                 "in your system have synchronized clocks. "
                 "See more at %s",
                 get_docs_url("configurations-ref.html#secret-key"),
                 exc_info=True,
             )
-            abort(403)
+            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, 
detail="Token issued in future")
         except Exception:
             logger.warning("Unknown error", exc_info=True)
-            abort(403)
+            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, 
detail="Authentication failed")
 
-    @flask_app.route("/log/<path:filename>")
-    def serve_logs_view(filename):
-        return send_from_directory(log_directory, filename, 
mimetype="application/json", as_attachment=False)
 
-    return flask_app
+def create_app():
+    leeway = conf.getint("webserver", "log_request_clock_grace", fallback=30)
+    log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
+    log_config_class = conf.get("logging", "logging_config_class")
+    if log_config_class:
+        logger.info("Detected user-defined logging config. Attempting to load 
%s", log_config_class)
+        try:
+            logging_config = import_string(log_config_class)
+            try:
+                base_log_folder = 
logging_config["handlers"]["task"]["base_log_folder"]
+            except KeyError:
+                base_log_folder = None
+            if base_log_folder is not None:
+                log_directory = base_log_folder
+                logger.info(
+                    "Successfully imported user-defined logging config. 
FastAPI App will serve log from %s",
+                    log_directory,
+                )
+            else:
+                logger.warning(
+                    "User-defined logging config does not specify 
'base_log_folder'. "
+                    "FastAPI App will use default log directory %s",
+                    base_log_folder,
+                )
+        except Exception as e:
+            raise ImportError(f"Unable to load {log_config_class} due to 
error: {e}")
+
+    fastapi_app = FastAPI()
+    fastapi_app.state.signer = JWTValidator(

Review Comment:
   We have to set the `signer` instance in `app.state` just like what we do in 
core-api for `dag_bag` to make it singleton.



##########
airflow-core/src/airflow/utils/serve_logs.py:
##########
@@ -188,8 +209,13 @@ def serve_logs(port=None):
     else:
         bind_option = GunicornOption("bind", f"0.0.0.0:{port}")
 
-    options = [bind_option, GunicornOption("workers", 2)]
-    StandaloneGunicornApplication(wsgi_app, options).run()
+    # Use Uvicorn worker class for ASGI applications
+    options = [
+        bind_option,
+        GunicornOption("workers", 2),
+        GunicornOption("worker_class", "uvicorn.workers.UvicornWorker"),
+    ]
+    StandaloneGunicornApplication(asgi_app, options).run()
 

Review Comment:
   IMO, I'm also think about replace `StandaloneGunicornApplication` with 
`uvicorn.run`. Since the `api_server_command` use `uvicorn.run` to start the 
whole core-api.
   
   Any comment for this ?
   
   
https://github.com/apache/airflow/blob/b5facdfb8f9ff7e7f9b7d586bc8fb4f3c0e3ce35/airflow-core/src/airflow/cli/commands/api_server_command.py#L106-L117



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to