This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5629-3dab771a2fe3ea5bf97c4c69cfbd761f9cd01e54 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 2cdb1fe20c2fef594e0379e31b349fb4f2899475 Author: ali risheh <[email protected]> AuthorDate: Mon Jun 15 14:59:46 2026 -0700 fix(access-control-service): include port in computing unit pod URI and use Envoy Gateway for distributed CUs (#5629) ### What changes were proposed in this PR? Make the in-cluster address of a computing unit come from a single source of truth — the URI recorded when its pod is created — and ensure that URI is complete (includes the port). This lets the gateway route a user to a computing unit located **anywhere it can reach** (in the local cluster, another cluster, or an external host), instead of being limited to a reconstructed in-cluster address. See #5630. Two related changes: **1. Include the port in the generated pod URI** (`computing-unit-managing-service`) `KubernetesClient.generatePodURI` builds the address stored as the computing unit's `uri` (via `setUri` in `ComputingUnitManagingResource`) and returned to clients as `nodeAddresses`. The pod's container listens on `KubernetesConfig.computeUnitPortNumber` (declared with `withContainerPort(...)` in the same file), but the generated URI omitted the port, so the persisted address was not directly connectable. The port is now appended: ```scala s"...svc.cluster.local:${KubernetesConfig.computeUnitPortNumber}" ``` **2. Route using the recorded URI** (`access-control-service`) `AccessControlResource` rebuilt the computing unit's address from `KubernetesConfig` on every authorization request, duplicating the construction logic in `generatePodURI` and pinning every CU to the local cluster. It now reads the URI recorded for the unit and returns it as the `Host` for the gateway to route to. If no URI has been recorded, the unit is not routable and the request is **refused with `403`** (no in-cluster fallback, per review). ### Routing flow The access-control service is the gateway's external authorizer; the `Host` it returns is the upstream Envoy forwards the (upgraded) connection to. Because that host comes from the unit's recorded URI, the same gateway can reach computing units in different locations: ```mermaid flowchart LR FE["Frontend<br/>(/wsapi?cuid=N)"] --> GW["Envoy Gateway"] GW -. "ext-auth: authorize + get Host" .-> ACS["access-control-service"] ACS -- "read recorded uri for CU N" --> DB[("workflow_computing_unit")] ACS -- "Host = recorded uri<br/>(or 403 if none)" --> GW GW == "dynamic forward proxy<br/>to returned Host" ==> R{Where the CU lives} R --> CU1["In-cluster CU pod<br/>computing-unit-N...svc.cluster.local:port"] R --> CU2["CU in another cluster"] R --> CU3["External / remote CU host:port"] ``` ### Any related issues, documentation, discussions? - Closes #5630. - Builds on the Envoy Gateway / ext-auth routing introduced in #4191 (unified Envoy Gateway) and #3598 (access-control-service as the ext-auth service for computing-unit traffic). ### How was this PR tested? On live deployment. <img width="1835" height="960" alt="Screenshot from 2026-06-13 13-31-00" src="https://github.com/user-attachments/assets/d56a48f9-b99d-4d36-827a-0a4ce54995fd" /> ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) --------- Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]> --- .../service/resource/AccessControlResource.scala | 29 +++++++--- .../apache/texera/AccessControlResourceSpec.scala | 65 ++++++++++++++++++++-- .../texera/service/util/KubernetesClient.scala | 2 +- 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala index 4f1691287f..792a0dfd8a 100644 --- a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala +++ b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala @@ -27,8 +27,10 @@ import jakarta.ws.rs.{Consumes, DELETE, GET, POST, PUT, Path, Produces} import org.apache.texera.auth.JwtParser.parseToken import org.apache.texera.auth.SessionUser import org.apache.texera.auth.util.{ComputingUnitAccess, HeaderField} -import org.apache.texera.common.config.{GuiConfig, KubernetesConfig, LLMConfig} +import org.apache.texera.common.config.{GuiConfig, LLMConfig} +import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum +import org.apache.texera.dao.jooq.generated.tables.daos.WorkflowComputingUnitDao import java.net.URLDecoder import java.nio.charset.StandardCharsets @@ -136,12 +138,25 @@ object AccessControlResource extends LazyLogging { } // Dynamic Routing Logic - val workflowComputingUnitPoolName = KubernetesConfig.computeUnitPoolName - val workflowComputingUnitPoolNamespace = KubernetesConfig.computeUnitPoolNamespace - val workflowComputingUnitPoolPort = KubernetesConfig.computeUnitPortNumber - - val targetHost = - s"computing-unit-$cuidInt.$workflowComputingUnitPoolName-svc.$workflowComputingUnitPoolNamespace.svc.cluster.local:$workflowComputingUnitPoolPort" + // Route to the URI recorded for the computing unit (written by the managing + // service when the pod is created). This recorded URI is the single source + // of truth for where the unit is reachable, allowing units to live anywhere + // the gateway can route to. If no URI has been recorded, the unit is not + // routable and the connection is refused. + val cuDao = new WorkflowComputingUnitDao( + SqlServer.getInstance().createDSLContext().configuration() + ) + val unit = cuDao.fetchOneByCuid(cuidInt) + val recordedUri = Option(unit).flatMap(u => Option(u.getUri)).map(_.trim).filter(_.nonEmpty) + + val targetHost = recordedUri match { + case Some(uri) => + logger.info(s"Routing CU $cuidInt to recorded host: $uri") + uri + case None => + logger.warn(s"Refusing CU $cuidInt: no URI recorded for the computing unit") + return Response.status(Response.Status.FORBIDDEN).build() + } Response .ok() diff --git a/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala b/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala index 3dfe81d89d..365f5f885f 100644 --- a/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala +++ b/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala @@ -55,6 +55,11 @@ class AccessControlResourceSpec private val testURI: String = "http://localhost:8080/" private val testPath: String = "/api/executions/1/stats/1" + // The host:port the managing service records for a computing unit when it + // creates the pod. The access-control-service routes to this recorded URI. + private val testRecordedUri: String = + "computing-unit-2.compute-unit-svc.default.svc.cluster.local:8888" + private val testUser1: User = { val user = new User() user.setUid(1) @@ -81,6 +86,31 @@ class AccessControlResourceSpec cu.setType(WorkflowComputingUnitTypeEnum.kubernetes) cu.setCuid(2) cu.setName("test-cu") + cu.setUri(testRecordedUri) + cu + } + + // A computing unit the user can access but for which no URI was ever recorded + // (e.g. the pod was never created). Such a unit is not routable and must be + // refused. + private val testCUNoUri: WorkflowComputingUnit = { + val cu = new WorkflowComputingUnit() + cu.setUid(2) + cu.setType(WorkflowComputingUnitTypeEnum.kubernetes) + cu.setCuid(3) + cu.setName("test-cu-no-uri") + cu + } + + // A computing unit whose recorded URI is blank/whitespace-only — also treated + // as "no URI recorded" and refused. + private val testCUBlankUri: WorkflowComputingUnit = { + val cu = new WorkflowComputingUnit() + cu.setUid(2) + cu.setType(WorkflowComputingUnitTypeEnum.kubernetes) + cu.setCuid(4) + cu.setName("test-cu-blank-uri") + cu.setUri(" ") cu } @@ -96,12 +126,18 @@ class AccessControlResourceSpec userDao.insert(testUser1) userDao.insert(testUser2) computingUnitDao.insert(testCU) - - val cuAccess = new ComputingUnitUserAccess() - cuAccess.setUid(testUser1.getUid) - cuAccess.setCuid(testCU.getCuid) - cuAccess.setPrivilege(PrivilegeEnum.WRITE) - computingUnitOfUserDao.insert(cuAccess) + computingUnitDao.insert(testCUNoUri) + computingUnitDao.insert(testCUBlankUri) + + // Grant testUser1 WRITE access to every test computing unit so the routing + // logic (not the access check) is what each routing test exercises. + Seq(testCU, testCUNoUri, testCUBlankUri).foreach { cu => + val cuAccess = new ComputingUnitUserAccess() + cuAccess.setUid(testUser1.getUid) + cuAccess.setCuid(cu.getCuid) + cuAccess.setPrivilege(PrivilegeEnum.WRITE) + computingUnitOfUserDao.insert(cuAccess) + } val claims = JwtAuth.jwtClaims(testUser1, 1) token = JwtAuth.jwtToken(claims) @@ -232,6 +268,23 @@ class AccessControlResourceSpec response.getHeaderString(HeaderField.UserId) shouldBe testUser1.getUid.toString response.getHeaderString(HeaderField.UserName) shouldBe testUser1.getName response.getHeaderString(HeaderField.UserEmail) shouldBe testUser1.getEmail + // Envoy routes by the rewritten Host header, which must be the URI recorded + // for the computing unit. + response.getHeaderString("Host") shouldBe testRecordedUri + } + + it should "refuse the connection when no URI is recorded for the computing unit" in { + val (uri, headers) = mockRequest(testPath, Some(testCUNoUri.getCuid.toString)) + val response = new AccessControlResource().authorizeGet(uri, headers) + + response.getStatus shouldBe Response.Status.FORBIDDEN.getStatusCode + } + + it should "refuse the connection when the recorded URI is blank" in { + val (uri, headers) = mockRequest(testPath, Some(testCUBlankUri.getCuid.toString)) + val response = new AccessControlResource().authorizeGet(uri, headers) + + response.getStatus shouldBe Response.Status.FORBIDDEN.getStatusCode } private def mockRequest( diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala index 5177ebaf47..4f1d391cb3 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala @@ -35,7 +35,7 @@ object KubernetesClient { private val podNamePrefix = "computing-unit" def generatePodURI(cuid: Int): String = { - s"${generatePodName(cuid)}.${KubernetesConfig.computeUnitServiceName}.$namespace.svc.cluster.local" + s"${generatePodName(cuid)}.${KubernetesConfig.computeUnitServiceName}.$namespace.svc.cluster.local:${KubernetesConfig.computeUnitPortNumber}" } def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid"
