Ma77Ball commented on code in PR #5278: URL: https://github.com/apache/texera/pull/5278#discussion_r3358953354
########## common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala: ########## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.huggingFace.codegen + +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext + +/** + * Builds the Python script emitted by HuggingFaceInferenceOpDesc. + * + * The script defines a `ProcessTableOperator` class with: + * - Per-instance configuration set in `open(self)` from base64-encoded + * values that the `pyb"..."` macro decodes at runtime (so user-input + * strings never appear as raw Python literals in the source). + * - A provider-fallback system that walks the HF Hub's inference-provider + * list cheapest-first and tries each provider's native chat-completions + * route, with HF Inference Router as the default. + * - A `process_table` loop that validates the prompt column, builds the + * per-row payload via the per-task codegen, posts to the resolved + * provider, and parses the response. + * - A `_parse_response` task switch whose branches are provided by the + * per-task codegen. + * + * Per-task variation lives in `TaskCodegen` implementations. This class + * holds only what is shared across all HF tasks; per-task helpers (image + * loading, audio MIME inference, media-URL fetching, etc.) will be added + * in subsequent PRs as the corresponding task families land. + */ +object PythonCodegenBase { + + def render(ctx: CodegenContext, codegen: TaskCodegen): String = { + val payload = codegen.payloadPython(ctx) + val parse = codegen.parsePython(ctx) + val hfApiToken = ctx.hfApiToken + val modelId = ctx.modelId + val promptColumn = ctx.promptColumn + val resultColumn = ctx.resultColumn + val task = ctx.task + val systemPrompt = ctx.systemPrompt + val maxNewTokens = ctx.safeMaxTokens + val temperature = ctx.safeTemp + pyb"""import os + |import re + |import json + |import requests + |import pandas as pd + |from pytexera import * + | + |# Defensive format check for MODEL_ID before it is interpolated into + |# HF URL paths. The base host is hardcoded so the worst case isn't + |# SSRF, but rejecting `..` segments / query strings / fragments / + |# control chars keeps the operator's request shape predictable. + |_HF_MODEL_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*(/[A-Za-z0-9._-]+)+$$") + | + |class ProcessTableOperator(UDFTableOperator): + | + | # Providers ranked cheapest-first (lower index = cheaper). + | # Unknown providers are appended at the end. + | PROVIDER_COST_PRIORITY = [ + | "hf-inference", + | "cerebras", + | "sambanova", + | "groq", + | "novita", + | "nebius", + | "fireworks-ai", + | "together", + | "hyperbolic", + | "scaleway", + | "nscale", + | "ovhcloud", + | "deepinfra", + | "featherless-ai", + | "baseten", + | "publicai", + | "nvidia", + | "openai", + | "cohere", + | "clarifai", + | ] + | + | def open(self): + | # User-provided strings reach the operator via base64-encoded + | # decode expressions so they cannot break Python syntax or + | # leak raw text into the generated source. + | self.HF_API_TOKEN = $hfApiToken + | self.MODEL_ID = $modelId + | self.PROMPT_COLUMN = $promptColumn + | self.RESULT_COLUMN = $resultColumn + | self.TASK = $task + | self.SYSTEM_PROMPT = $systemPrompt + | self.MAX_NEW_TOKENS = $maxNewTokens + | self.TEMPERATURE = $temperature + | + | def _resolve_providers(self, token): + | '''Query the HF Hub API for inference providers serving this model. + | Returns a list of dicts with 'name' and 'providerId' sorted + | cheapest-first. Falls back to hf-inference if anything goes wrong. + | ''' + | try: + | resp = requests.get( + | f"https://huggingface.co/api/models/{self.MODEL_ID}", + | headers={"Authorization": f"Bearer {token}"}, + | params={"expand[]": "inferenceProviderMapping"}, + | timeout=30, + | ) + | if resp.status_code == 200: + | data = resp.json() + | mapping = ( + | data.get("inferenceProviderMapping") + | or data.get("inference_provider_mapping") + | or {} + | ) + | if mapping: + | live = [ + | { + | "name": p, + | "providerId": v.get("providerId", self.MODEL_ID), + | "task": v.get("task", ""), + | "isModelAuthor": v.get("isModelAuthor", False), + | } + | for p, v in mapping.items() + | if isinstance(v, dict) and v.get("status") == "live" + | ] + | if live: + | priority = {name: idx for idx, name in enumerate(self.PROVIDER_COST_PRIORITY)} + | live.sort(key=lambda prov: priority.get(prov["name"], len(self.PROVIDER_COST_PRIORITY))) + | return live + | except Exception: + | pass + | return [{"name": "hf-inference", "providerId": self.MODEL_ID}] + | + | def _post_with_fallback(self, providers, json_headers, pipeline_payload, prompt_value): + | '''Try providers in order, using the correct API route for each. + | Returns (response, provider_summary). provider_summary is None on + | success or a string describing what failed. + | ''' + | RETRYABLE = (400, 404, 422, 429, 502, 503) + | last_resp = None + | errors = [] + | for prov in providers: + | provider_name = prov["name"] + | provider_id = prov["providerId"] + | try: + | if self.TASK == "text-generation": + | chat_routes = { + | "groq": "openai/v1/chat/completions", + | "fireworks-ai": "inference/v1/chat/completions", + | "cohere": "compatibility/v1/chat/completions", + | "clarifai": "v2/ext/openai/v1/chat/completions", + | "deepinfra": "v1/openai/chat/completions", + | } + | route = chat_routes.get(provider_name, "v1/chat/completions") + | url = f"https://router.huggingface.co/{provider_name}/{route}" + | resp = requests.post(url, headers=json_headers, json=pipeline_payload, timeout=120) + | elif provider_name == "hf-inference": + | url = f"https://router.huggingface.co/hf-inference/models/{self.MODEL_ID}" + | resp = requests.post(url, headers=json_headers, json=pipeline_payload, timeout=120) + | else: + | resp = self._call_provider(provider_name, provider_id, json_headers, pipeline_payload, prompt_value) + | except Exception as e: + | errors.append(f"{provider_name}: {type(e).__name__}") + | continue + | if resp.status_code in (200, 201): + | return resp, None + | if resp.status_code == 401: + | return resp, None + | try: + | detail = resp.json().get("error", resp.text[:200]) + | except Exception: + | detail = resp.text[:200] if resp.text else "no details" + | errors.append(f"{provider_name}: HTTP {resp.status_code} - {detail}") + | last_resp = resp + | if resp.status_code not in RETRYABLE: + | return resp, "; ".join(errors) + | summary = "; ".join(errors) if errors else "no providers available" + | return last_resp, summary + | + | def _call_provider(self, provider_name, provider_id, json_headers, pipeline_payload, prompt_value): + | '''Route to a third-party provider using its native API format. + | For the text-gen-only build this covers the OpenAI-compatible chat + | providers and an unknown-provider fallback that tries the pipeline + | format then chat completions. Image / audio / media routing will + | be added in subsequent PRs alongside the corresponding task + | codegens. + | ''' Review Comment: This method is never reached for `text-generation`: the first branch of `_post_with_fallback` (`if self.TASK == "text-generation"`) handles every provider before the `else` that calls `_call_provider`. So it is unexercised by this PR (codecov flags it), and the docstring's claim that it "covers the OpenAI-compatible chat providers" for this build is misleading. It does get used by the stacked image PR, so this is intentional scaffolding. Suggested docstring that says so: ```suggestion | '''Route to a third-party provider using its native API format. | Exercised starting with the image / audio / media task families; | text-generation never reaches this method (its providers are all | handled by the text-generation branch in _post_with_fallback), | so this is intentional scaffolding and shows as uncovered here. | Covers OpenAI-compatible chat providers plus an unknown-provider | fallback that tries the pipeline format then chat completions. | ''' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
