kunwp1 commented on code in PR #4359: URL: https://github.com/apache/texera/pull/4359#discussion_r3113252714
########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## @@ -0,0 +1,544 @@ +package org.apache.texera.web.resource.pythonvirtualenvironment +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.{File, RandomAccessFile} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.util.concurrent.BlockingQueue +import scala.collection.mutable.Map +import scala.jdk.CollectionConverters._ +import scala.sys.process._ + +object PveManager { + + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") + + private def ensureDirExists(path: Path): Unit = { + if (!Files.exists(path)) Files.createDirectories(path) + } + + private def cuidDir(cuid: Int, pvename: String): Path = { + ensureDirExists(VenvRoot) + val cuIdDir = VenvRoot.resolve(cuid.toString) + ensureDirExists(cuIdDir) + + val dir = cuIdDir.resolve(pvename) + ensureDirExists(dir) + + dir + } + + private def pveDir(cuid: Int, pveName: String): Path = + cuidDir(cuid, pveName).resolve("pve") + + private def pythonBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("python") + + private def pipBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("pip") + + private def metadataDir(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("metadata") + + private def systemPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("system-packages.txt") + + private def userPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("user-packages.txt") + + private def ensureParentDir(path: Path): Unit = { + val parent = path.getParent + if (parent != null && !Files.exists(parent)) Files.createDirectories(parent) + } + + private def writeMetadata(path: Path, lines: Seq[String]): Unit = { + ensureParentDir(path) + Files.write( + path, + lines.asJava, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE + ) + } + + private def readMetadataList(path: Path): List[String] = { + if (!Files.exists(path)) return Nil + Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList + } + + private def parsePackageName(line: String): String = + line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase + + private def pipEnv: Map[String, String] = + Map( + "PYTHONUNBUFFERED" -> "1", + "PIP_PROGRESS_BAR" -> "off", + "PIP_DISABLE_PIP_VERSION_CHECK" -> "1", + "PIP_NO_INPUT" -> "1" + ) + + def pythonBin(cuid: Int, pveName: String): String = + pythonBinPath(cuid, pveName).toAbsolutePath.toString + + def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], Seq[String]) = { + val sys = readMetadataList(systemPackagesPath(cuid, pveName)) + val usr = readMetadataList(userPackagesPath(cuid, pveName)) + (sys, usr) + } + + def deletePackages(cuid: Int, packageName: String, pveName: String): List[String] = { + val pipPath = pipBinPath(cuid, pveName).toAbsolutePath + val userFile = userPackagesPath(cuid, pveName) + val systemFile = systemPackagesPath(cuid, pveName) + + val systemPackages: Set[String] = + readMetadataList(systemFile).map(parsePackageName).toSet + + val normalizedName = packageName.toLowerCase + if (systemPackages.contains(normalizedName)) { + return List(s"ERROR: '$packageName' is a system package and cannot be deleted.") + } + + if (!Files.exists(pipPath)) { + val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or is not initialized." + println(msg) + return List(msg) + } + + try { + val command = Seq(pipPath.toString, "uninstall", "-y", packageName) + + val logger = ProcessLogger( + (out: String) => println(s"[pip] $out"), + (err: String) => System.err.println(s"[pip][ERR] $err") + ) + + val exitCode = command.!(logger) + + if (exitCode == 0) { + val existing = readMetadataList(userFile) + val updated = + existing.filterNot(line => parsePackageName(line) == normalizedName).sorted + + writeMetadata(userFile, updated) + + List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully") + } else { + List(s"[PveManager] pip uninstall for '$packageName' failed with exit code $exitCode") + } + } catch { + case e: Exception => + List(s"[PveManager] Failed to delete package for cuid=$cuid: ${e.getMessage}") + } + } + + private def tailFileToQueue( + file: File, + queue: BlockingQueue[String], + prefix: String = "[pip] " + ): AutoCloseable = { + val raf = new RandomAccessFile(file, "r") + raf.seek(raf.length()) + @volatile var running = true + + val thread = new Thread(() => { + var buf = new StringBuilder + val charset = StandardCharsets.UTF_8 + try { + while (running) { + val available = raf.length() - raf.getFilePointer + if (available > 0) { + val bytes = new Array[Byte](math.min(available, 8192).toInt) + val n = raf.read(bytes) + if (n > 0) { + buf.append(new String(bytes, 0, n, charset)) + var newlineIndex = buf.indexOf("\n") + while (newlineIndex >= 0) { + val line = buf.substring(0, newlineIndex).trim + if (shouldStream(line)) queue.put(prefix + line) + buf = buf.delete(0, newlineIndex + 1) + newlineIndex = buf.indexOf("\n") + } + } + } else { + Thread.sleep(100) + } + } + val last = buf.result().trim + if (shouldStream(last)) queue.put(prefix + last) + } catch { + case _: InterruptedException => () + case e: Exception => queue.put(s"[pip][ERR] tail exception: ${e.getMessage}") + } finally { + raf.close() + } + }) + + thread.setDaemon(true) + thread.start() + + new AutoCloseable { + override def close(): Unit = { + running = false + thread.interrupt() + thread.join(500) + } + } + } + + private def shouldStream(line: String): Boolean = { + val s = line.trim + if (s.isEmpty) return false + + val lower = s.toLowerCase + + if (lower.contains("found link")) return false + if (lower.contains("skipping link")) return false + if (lower.contains("cache")) return false + if (lower.contains("caching")) return false + + true + } + + private def runPipWithLog( + cmd: Seq[String], + env: Map[String, String], + queue: BlockingQueue[String] + ): Int = { + val logFile = File.createTempFile("pip-live-", ".log") + val fullCmd = if (cmd.contains("--log")) cmd else cmd ++ Seq("--log", logFile.getAbsolutePath) + + val tailer = tailFileToQueue(logFile, queue) + + val logger = ProcessLogger( + out => if (shouldStream(out)) queue.put(s"[pip/stdout] $out"), + err => if (shouldStream(err)) queue.put(s"[pip/stderr] $err") + ) + + val proc = Process(fullCmd, None, env.toSeq: _*).run(logger) + val exitCode = proc.exitValue() + + try tailer.close() + catch { case _: Throwable => () } + + queue.put(s"[pip] (log at ${logFile.getAbsolutePath})") + exitCode + } + + def createNewPve(cuid: Int, queue: BlockingQueue[String], pveName: String): Unit = { + queue.put(s"[PVE] Creating new PVE for cuid=$cuid with name=$pveName") + + val venvDirPath = pveDir(cuid, pveName).toAbsolutePath + ensureDirExists(cuidDir(cuid, pveName)) + + val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString + val envVars = pipEnv + + val Requirements: String = + """wheel==0.41.2 Review Comment: It's not a good idea to hardcode all the python packages because it's hard to sync with the latest requirements. ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## @@ -0,0 +1,544 @@ +package org.apache.texera.web.resource.pythonvirtualenvironment +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.{File, RandomAccessFile} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.util.concurrent.BlockingQueue +import scala.collection.mutable.Map +import scala.jdk.CollectionConverters._ +import scala.sys.process._ + +object PveManager { + + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") + + private def ensureDirExists(path: Path): Unit = { + if (!Files.exists(path)) Files.createDirectories(path) + } + + private def cuidDir(cuid: Int, pvename: String): Path = { + ensureDirExists(VenvRoot) Review Comment: I think the VenvRoot and all the subdirectories except PVE related directories should be created when a computing unit is created. The lifecycle of those directories is not clear in this PR. ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## Review Comment: Add some comments of what this manager is and also add comment for each method if the method is too long. ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -56,6 +56,27 @@ import { isComputingUnitShmTooLarge, getJvmMemorySliderConfig, } from "../../../common/util/computing-unit.util"; +import { PvePackageResponse, WorkflowPveService } from "../../service/virtual-environment/virtual-environment.service"; + +type PackageRow = { + name: string; + operator?: "==" | ">=" | "<="; + version?: string; + isHighlighted?: boolean; Review Comment: Maybe rename it to `delete` or `deleteToggle`? ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## @@ -0,0 +1,544 @@ +package org.apache.texera.web.resource.pythonvirtualenvironment +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.{File, RandomAccessFile} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.util.concurrent.BlockingQueue +import scala.collection.mutable.Map +import scala.jdk.CollectionConverters._ +import scala.sys.process._ + +object PveManager { + + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") + + private def ensureDirExists(path: Path): Unit = { + if (!Files.exists(path)) Files.createDirectories(path) + } + + private def cuidDir(cuid: Int, pvename: String): Path = { + ensureDirExists(VenvRoot) + val cuIdDir = VenvRoot.resolve(cuid.toString) + ensureDirExists(cuIdDir) + + val dir = cuIdDir.resolve(pvename) + ensureDirExists(dir) + + dir + } + + private def pveDir(cuid: Int, pveName: String): Path = + cuidDir(cuid, pveName).resolve("pve") + + private def pythonBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("python") + + private def pipBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("pip") + + private def metadataDir(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("metadata") + + private def systemPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("system-packages.txt") + + private def userPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("user-packages.txt") + + private def ensureParentDir(path: Path): Unit = { Review Comment: Consider removing `ensureParentDir` and `ensureDirExists`. I don't think we need to check whether the directory exists or not. ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## @@ -0,0 +1,544 @@ +package org.apache.texera.web.resource.pythonvirtualenvironment +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.{File, RandomAccessFile} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.util.concurrent.BlockingQueue +import scala.collection.mutable.Map +import scala.jdk.CollectionConverters._ +import scala.sys.process._ + +object PveManager { + + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") + + private def ensureDirExists(path: Path): Unit = { + if (!Files.exists(path)) Files.createDirectories(path) + } + + private def cuidDir(cuid: Int, pvename: String): Path = { + ensureDirExists(VenvRoot) + val cuIdDir = VenvRoot.resolve(cuid.toString) + ensureDirExists(cuIdDir) Review Comment: You don't need to run createDirectories step-by-step. Can simply run a single createDirectories call. ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements OnInit { this.computingUnitStatusService.refreshComputingUnitList(); } } + + private makeEmptyPve(expanded: boolean): PveDraft { + return { + id: this.nextPveId++, + name: "", + userPackages: [], + newPackages: [{ name: "", operator: "==", version: "" }], + deletingPackages: [], + pipOutput: "", + prettyPipOutput: "", + expanded, + isInstalling: false, + }; + } + + addEnvironment(): void { + this.pves.push(this.makeEmptyPve(true)); + } + + trackByPveId(_index: number, pve: PveDraft): number { + return pve.id; + } + + showPVEmodalVisible(): void { + this.pveModalVisible = true; + this.getPVEs(); + } + + getPVEs(): void { + const cuId: number | undefined = this.selectedComputingUnit?.computingUnit.cuid; + + if (cuId == null) { + this.notificationService.error("No computing unit selected. Please select a CU first."); Review Comment: I don't see a way to set cuId == null in the frontend. What's a test scenario to test this error? ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements OnInit { this.computingUnitStatusService.refreshComputingUnitList(); } } + + private makeEmptyPve(expanded: boolean): PveDraft { + return { + id: this.nextPveId++, + name: "", + userPackages: [], + newPackages: [{ name: "", operator: "==", version: "" }], + deletingPackages: [], + pipOutput: "", + prettyPipOutput: "", + expanded, + isInstalling: false, + }; + } + + addEnvironment(): void { + this.pves.push(this.makeEmptyPve(true)); + } + + trackByPveId(_index: number, pve: PveDraft): number { + return pve.id; + } + + showPVEmodalVisible(): void { + this.pveModalVisible = true; + this.getPVEs(); + } + + getPVEs(): void { + const cuId: number | undefined = this.selectedComputingUnit?.computingUnit.cuid; + + if (cuId == null) { + this.notificationService.error("No computing unit selected. Please select a CU first."); + return; + } + + this.workflowPveService.setCuid(cuId); + + this.workflowPveService + .fetchPVEs(cuId) + .pipe(untilDestroyed(this)) + .subscribe({ + next: (resp: PvePackageResponse[]) => { + this.pves = resp.map((pve, index) => ({ + id: index, + name: pve.pveName, + expanded: false, + isInstalling: false, + pipOutput: "", + prettyPipOutput: "", + userPackages: (pve.userPackages ?? []).map(pkg => { + const [name, version] = pkg.split("=="); + return { + name: name.trim(), + version: (version ?? "").trim(), + isHighlighted: false, + }; + }), + newPackages: [], + deletingPackages: [], + })); + }, + error: (err: unknown) => { + console.error("Failed to fetch PVEs:", err); + this.pves = []; + }, + }); + } + + addPackage(index: number): void { + const env = this.pves[index]; + env.newPackages.push({ name: "", version: "", operator: undefined, isHighlighted: false }); + } + + togglePackageDelete(index: number, pkg: PackageRow): void { + const env = this.pves[index]; + + pkg.isHighlighted = !pkg.isHighlighted; + + const version = pkg.version ?? ""; + + if (pkg.isHighlighted) { + const exists = env.deletingPackages.some(p => p.name === pkg.name && (p.version ?? "") === version); + if (!exists) { + env.deletingPackages.push({ name: pkg.name, version }); + } + } else { + env.deletingPackages = env.deletingPackages.filter(p => !(p.name === pkg.name && (p.version ?? "") === version)); + } + } + + scrollToBottomOfPipModal(index: number) { + setTimeout(() => { + const pre = document.getElementById(`pip-log-${index}`) as HTMLElement | null; + if (pre) { + pre.scrollTop = pre.scrollHeight; + } + }, 50); + } + + updatePrettyPipOutput(index: number) { + const env = this.pves[index]; + + const escapeHtml = (s: string) => + s + .replace(/&/g, "&") + .replace(/</g, "<") + .replace(/>/g, ">") + .replace(/"/g, """) + .replace(/'/g, "'"); + + const raw = env.pipOutput ?? ""; + const safe = escapeHtml(raw); + + env.prettyPipOutput = safe + .replace( + /^(\[pip\].*finished with exit code\s+0.*)$/gm, + "<span class=\"pip-exit ok\"><strong>$1</strong></span>" + ) + .replace( + /^(\[pip\].*finished with exit code\s+1.*)$/gm, + "<span class=\"pip-exit err\"><strong>$1</strong></span>" + ) + .replace( + /^(\[pip\].*finished with exit code\s+([2-9]\d*).*)$/gm, + "<span class=\"pip-exit err\"><strong>$1</strong></span>" + ) + .replace(/ERROR/g, "<span class=\"error\">ERROR</span>") + .replace(/WARNING/g, "<span class=\"warning\">WARNING</span>") + .replace(/already satisfied/g, "<span class=\"success\">already satisfied</span>") + .replace(/\n/g, "<br/>"); + } + + createVirtualEnvironment(index: number): void { + const cuId = this.selectedComputingUnit?.computingUnit.cuid; + const env = this.pves[index]; + + if (cuId == null) { Review Comment: Same here. ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala: ########## Review Comment: Check if there are any dead codes in this file. ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource.pythonvirtualenvironment + +import com.fasterxml.jackson.databind.ObjectMapper +import org.glassfish.jersey.server.ChunkedOutput + +import java.util.concurrent.LinkedBlockingQueue +import javax.ws.rs._ +import javax.ws.rs.core.MediaType +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.jdk.CollectionConverters._ +import java.util + +case class PackageResponse(system: java.util.List[String], user: java.util.List[String]) + +@Path("/pve") +@Consumes(Array(MediaType.APPLICATION_JSON)) +class PveResource { + + // -------------------------------------------------- + // Create / Install packages (SSE) + // -------------------------------------------------- + @GET + @Produces(Array("text/event-stream")) + def createPve( + @QueryParam("packages") packagesJson: String, + @QueryParam("cuid") cuid: Int, + @QueryParam("pveName") pveName: String + ): ChunkedOutput[String] = { + + if (cuid == 0) { + throw new BadRequestException("Missing or invalid cuid") + } + + if (pveName == null || pveName.trim.isEmpty) { + throw new BadRequestException("Missing environment name") + } + + val mapper = new ObjectMapper() + val packages = + if (packagesJson == null || packagesJson.trim.isEmpty) + List.empty[String] + else + mapper.readValue(packagesJson, classOf[Array[String]]).toList + + val queue = new LinkedBlockingQueue[String]() + val chunkedOutput = new ChunkedOutput[String](classOf[String]) + + Future { + try { + + if (!PveManager.pveExists(cuid, pveName)) { + PveManager.createNewPve(cuid, queue, pveName) + } + + if (packages.nonEmpty) { + PveManager.installPackages(packages, cuid, queue, pveName) + } + + } catch { + case e: Exception => + queue.put(s"[ERR] ${e.getMessage}") + } finally { + queue.put("__DONE__") + } + } + + Future { + var done = false + while (!done) { + val line = queue.take() + if (line == "__DONE__") { + chunkedOutput.write("data: __DONE__\n\n") + done = true + } else chunkedOutput.write(s"data: $line\n\n") + } + chunkedOutput.close() + } + + chunkedOutput + } + + // -------------------------------------------------- + // Get installed packages + // -------------------------------------------------- + @GET + @Path("/packages") + @Produces(Array(MediaType.APPLICATION_JSON)) + def getInstalledPackages( + @QueryParam("cuid") cuid: Int, + @QueryParam("pveName") pveName: String + ): util.Map[String, util.List[String]] = { + + if (cuid == 0) { + throw new BadRequestException("Missing or invalid cuid") + } + + if (pveName == null || pveName.trim.isEmpty) { Review Comment: Is there a scenario where pveName can be empty when the backend is executing? This might be a deadcode. ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## @@ -0,0 +1,544 @@ +package org.apache.texera.web.resource.pythonvirtualenvironment +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.{File, RandomAccessFile} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.util.concurrent.BlockingQueue +import scala.collection.mutable.Map +import scala.jdk.CollectionConverters._ +import scala.sys.process._ + +object PveManager { + + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") + + private def ensureDirExists(path: Path): Unit = { + if (!Files.exists(path)) Files.createDirectories(path) + } + + private def cuidDir(cuid: Int, pvename: String): Path = { + ensureDirExists(VenvRoot) + val cuIdDir = VenvRoot.resolve(cuid.toString) + ensureDirExists(cuIdDir) + + val dir = cuIdDir.resolve(pvename) + ensureDirExists(dir) + + dir + } + + private def pveDir(cuid: Int, pveName: String): Path = + cuidDir(cuid, pveName).resolve("pve") + + private def pythonBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("python") + + private def pipBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("pip") + + private def metadataDir(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("metadata") + + private def systemPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("system-packages.txt") + + private def userPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("user-packages.txt") + + private def ensureParentDir(path: Path): Unit = { + val parent = path.getParent + if (parent != null && !Files.exists(parent)) Files.createDirectories(parent) + } + + private def writeMetadata(path: Path, lines: Seq[String]): Unit = { + ensureParentDir(path) + Files.write( + path, + lines.asJava, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE + ) + } + + private def readMetadataList(path: Path): List[String] = { + if (!Files.exists(path)) return Nil + Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList + } + + private def parsePackageName(line: String): String = + line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase + + private def pipEnv: Map[String, String] = + Map( + "PYTHONUNBUFFERED" -> "1", + "PIP_PROGRESS_BAR" -> "off", + "PIP_DISABLE_PIP_VERSION_CHECK" -> "1", + "PIP_NO_INPUT" -> "1" + ) + + def pythonBin(cuid: Int, pveName: String): String = + pythonBinPath(cuid, pveName).toAbsolutePath.toString + + def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], Seq[String]) = { + val sys = readMetadataList(systemPackagesPath(cuid, pveName)) + val usr = readMetadataList(userPackagesPath(cuid, pveName)) + (sys, usr) + } + + def deletePackages(cuid: Int, packageName: String, pveName: String): List[String] = { + val pipPath = pipBinPath(cuid, pveName).toAbsolutePath + val userFile = userPackagesPath(cuid, pveName) + val systemFile = systemPackagesPath(cuid, pveName) + + val systemPackages: Set[String] = + readMetadataList(systemFile).map(parsePackageName).toSet + + val normalizedName = packageName.toLowerCase + if (systemPackages.contains(normalizedName)) { + return List(s"ERROR: '$packageName' is a system package and cannot be deleted.") + } + + if (!Files.exists(pipPath)) { + val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or is not initialized." + println(msg) + return List(msg) + } + + try { + val command = Seq(pipPath.toString, "uninstall", "-y", packageName) + + val logger = ProcessLogger( + (out: String) => println(s"[pip] $out"), + (err: String) => System.err.println(s"[pip][ERR] $err") + ) + + val exitCode = command.!(logger) + + if (exitCode == 0) { + val existing = readMetadataList(userFile) + val updated = + existing.filterNot(line => parsePackageName(line) == normalizedName).sorted + + writeMetadata(userFile, updated) + + List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully") + } else { + List(s"[PveManager] pip uninstall for '$packageName' failed with exit code $exitCode") + } + } catch { + case e: Exception => + List(s"[PveManager] Failed to delete package for cuid=$cuid: ${e.getMessage}") + } + } + + private def tailFileToQueue( + file: File, + queue: BlockingQueue[String], + prefix: String = "[pip] " + ): AutoCloseable = { + val raf = new RandomAccessFile(file, "r") + raf.seek(raf.length()) + @volatile var running = true + + val thread = new Thread(() => { + var buf = new StringBuilder + val charset = StandardCharsets.UTF_8 + try { + while (running) { + val available = raf.length() - raf.getFilePointer + if (available > 0) { + val bytes = new Array[Byte](math.min(available, 8192).toInt) + val n = raf.read(bytes) + if (n > 0) { + buf.append(new String(bytes, 0, n, charset)) + var newlineIndex = buf.indexOf("\n") + while (newlineIndex >= 0) { + val line = buf.substring(0, newlineIndex).trim + if (shouldStream(line)) queue.put(prefix + line) + buf = buf.delete(0, newlineIndex + 1) + newlineIndex = buf.indexOf("\n") + } + } + } else { + Thread.sleep(100) + } + } + val last = buf.result().trim + if (shouldStream(last)) queue.put(prefix + last) + } catch { + case _: InterruptedException => () + case e: Exception => queue.put(s"[pip][ERR] tail exception: ${e.getMessage}") + } finally { + raf.close() + } + }) + + thread.setDaemon(true) + thread.start() + + new AutoCloseable { + override def close(): Unit = { + running = false + thread.interrupt() + thread.join(500) + } + } + } + + private def shouldStream(line: String): Boolean = { + val s = line.trim + if (s.isEmpty) return false + + val lower = s.toLowerCase + + if (lower.contains("found link")) return false + if (lower.contains("skipping link")) return false + if (lower.contains("cache")) return false + if (lower.contains("caching")) return false + + true + } + + private def runPipWithLog( + cmd: Seq[String], + env: Map[String, String], + queue: BlockingQueue[String] + ): Int = { + val logFile = File.createTempFile("pip-live-", ".log") + val fullCmd = if (cmd.contains("--log")) cmd else cmd ++ Seq("--log", logFile.getAbsolutePath) + + val tailer = tailFileToQueue(logFile, queue) + + val logger = ProcessLogger( + out => if (shouldStream(out)) queue.put(s"[pip/stdout] $out"), + err => if (shouldStream(err)) queue.put(s"[pip/stderr] $err") + ) + + val proc = Process(fullCmd, None, env.toSeq: _*).run(logger) + val exitCode = proc.exitValue() + + try tailer.close() + catch { case _: Throwable => () } + + queue.put(s"[pip] (log at ${logFile.getAbsolutePath})") + exitCode + } + + def createNewPve(cuid: Int, queue: BlockingQueue[String], pveName: String): Unit = { Review Comment: Add comment of this method. It's too long to understand. ########## amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala: ########## @@ -160,7 +161,7 @@ class TexeraWebApplication environment.jersey.register(classOf[UserQuotaResource]) environment.jersey.register(classOf[AdminSettingsResource]) environment.jersey.register(classOf[AIAssistantResource]) - Review Comment: Does the creation of PVE involve Texera Web Application? ########## bin/k8s/templates/gateway-routes.yaml: ########## @@ -100,27 +101,47 @@ spec: parentRefs: - name: {{ .Release.Name }}-gateway {{- if and .Values.gatewayConfig .Values.gatewayConfig.hostname }} - hostnames: - - {{ .Values.gatewayConfig.hostname }} +hostnames: + - {{ .Values.gatewayConfig.hostname }} {{- end }} - rules: - - matches: - - path: - type: PathPrefix - value: /wsapi - - path: - type: RegularExpression - value: "^/api/executions/\\d+/stats/\\d+$" - - path: - type: PathPrefix - value: /api/executions/result/export - backendRefs: - - group: gateway.envoyproxy.io - kind: Backend - name: texera-dynamic-backend +rules: + - matches: + - path: + type: PathPrefix + value: /pve + timeouts: + request: 10m Review Comment: Why do we need timeouts and why is it 10 minutes? ########## frontend/src/app/common/type/workflow-computing-unit.ts: ########## @@ -50,6 +50,7 @@ export interface DashboardWorkflowComputingUnit { metrics: WorkflowComputingUnitMetrics; isOwner: boolean; accessPrivilege: "READ" | "WRITE" | "NONE"; + packages: string[]; Review Comment: Do we need this change? If so, can you add a comment? ########## bin/k8s/templates/gateway-routes.yaml: ########## Review Comment: Please remove all the unnecessary diffs from this file. I see so many of them. ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements OnInit { this.computingUnitStatusService.refreshComputingUnitList(); } } + + private makeEmptyPve(expanded: boolean): PveDraft { + return { + id: this.nextPveId++, + name: "", + userPackages: [], + newPackages: [{ name: "", operator: "==", version: "" }], + deletingPackages: [], + pipOutput: "", + prettyPipOutput: "", + expanded, + isInstalling: false, + }; + } + + addEnvironment(): void { + this.pves.push(this.makeEmptyPve(true)); + } + + trackByPveId(_index: number, pve: PveDraft): number { Review Comment: Remove this? ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource.pythonvirtualenvironment + +import com.fasterxml.jackson.databind.ObjectMapper +import org.glassfish.jersey.server.ChunkedOutput + +import java.util.concurrent.LinkedBlockingQueue +import javax.ws.rs._ +import javax.ws.rs.core.MediaType +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.jdk.CollectionConverters._ +import java.util + +case class PackageResponse(system: java.util.List[String], user: java.util.List[String]) + +@Path("/pve") +@Consumes(Array(MediaType.APPLICATION_JSON)) +class PveResource { + + // -------------------------------------------------- + // Create / Install packages (SSE) + // -------------------------------------------------- + @GET + @Produces(Array("text/event-stream")) + def createPve( + @QueryParam("packages") packagesJson: String, + @QueryParam("cuid") cuid: Int, + @QueryParam("pveName") pveName: String + ): ChunkedOutput[String] = { + + if (cuid == 0) { + throw new BadRequestException("Missing or invalid cuid") + } + + if (pveName == null || pveName.trim.isEmpty) { + throw new BadRequestException("Missing environment name") + } + + val mapper = new ObjectMapper() + val packages = + if (packagesJson == null || packagesJson.trim.isEmpty) + List.empty[String] + else + mapper.readValue(packagesJson, classOf[Array[String]]).toList + + val queue = new LinkedBlockingQueue[String]() + val chunkedOutput = new ChunkedOutput[String](classOf[String]) + + Future { + try { + + if (!PveManager.pveExists(cuid, pveName)) { + PveManager.createNewPve(cuid, queue, pveName) + } + + if (packages.nonEmpty) { + PveManager.installPackages(packages, cuid, queue, pveName) + } + + } catch { + case e: Exception => + queue.put(s"[ERR] ${e.getMessage}") + } finally { + queue.put("__DONE__") + } + } + + Future { + var done = false + while (!done) { + val line = queue.take() + if (line == "__DONE__") { + chunkedOutput.write("data: __DONE__\n\n") + done = true + } else chunkedOutput.write(s"data: $line\n\n") + } + chunkedOutput.close() + } + + chunkedOutput + } + + // -------------------------------------------------- + // Get installed packages + // -------------------------------------------------- + @GET + @Path("/packages") + @Produces(Array(MediaType.APPLICATION_JSON)) + def getInstalledPackages( + @QueryParam("cuid") cuid: Int, + @QueryParam("pveName") pveName: String + ): util.Map[String, util.List[String]] = { + + if (cuid == 0) { + throw new BadRequestException("Missing or invalid cuid") + } + + if (pveName == null || pveName.trim.isEmpty) { + throw new BadRequestException("Missing environment name") + } + + try { + + println(s"[PVE] HIT getInstalledPackages cuid=$cuid") + + val (systemPkgsRaw, userPkgsRaw) = PveManager.getSystemAndUserPackages(cuid, pveName) + + println(s"[PVE] raw systemPkgsRaw isNull=${systemPkgsRaw == null} value=$systemPkgsRaw") + println(s"[PVE] raw userPkgsRaw isNull=${userPkgsRaw == null} value=$userPkgsRaw") + + val systemPkgs = Option(systemPkgsRaw).getOrElse(Seq.empty[String]).toList.asJava + val userPkgs = Option(userPkgsRaw).getOrElse(Seq.empty[String]).toList.asJava + + val resp = Map("system" -> systemPkgs, "user" -> userPkgs).asJava + + println( + s"[PVE] returning keys=${resp.keySet()} systemSize=${systemPkgs.size()} userSize=${userPkgs.size()}" + ) + + resp + + } catch { + case e: Exception => + e.printStackTrace() + Map( + "system" -> List(s"Error: ${e.getMessage}").asJava, + "user" -> List.empty[String].asJava + ).asJava + } + } + + // -------------------------------------------------- + // Fetch PVEs + // -------------------------------------------------- + @GET + @Path("/pves") + @Produces(Array(MediaType.APPLICATION_JSON)) + def fetchPVEs(@QueryParam("cuid") cuid: Int): util.List[util.Map[String, Object]] = { + + if (cuid == 0) { Review Comment: Is there a scenario where cuid can be 0? ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements OnInit { this.computingUnitStatusService.refreshComputingUnitList(); } } + + private makeEmptyPve(expanded: boolean): PveDraft { + return { + id: this.nextPveId++, + name: "", + userPackages: [], + newPackages: [{ name: "", operator: "==", version: "" }], + deletingPackages: [], + pipOutput: "", + prettyPipOutput: "", + expanded, + isInstalling: false, + }; + } + + addEnvironment(): void { + this.pves.push(this.makeEmptyPve(true)); + } + + trackByPveId(_index: number, pve: PveDraft): number { + return pve.id; + } + + showPVEmodalVisible(): void { + this.pveModalVisible = true; + this.getPVEs(); + } + + getPVEs(): void { + const cuId: number | undefined = this.selectedComputingUnit?.computingUnit.cuid; + + if (cuId == null) { + this.notificationService.error("No computing unit selected. Please select a CU first."); + return; + } + + this.workflowPveService.setCuid(cuId); + + this.workflowPveService + .fetchPVEs(cuId) + .pipe(untilDestroyed(this)) + .subscribe({ + next: (resp: PvePackageResponse[]) => { + this.pves = resp.map((pve, index) => ({ + id: index, + name: pve.pveName, + expanded: false, + isInstalling: false, + pipOutput: "", + prettyPipOutput: "", + userPackages: (pve.userPackages ?? []).map(pkg => { + const [name, version] = pkg.split("=="); + return { + name: name.trim(), + version: (version ?? "").trim(), + isHighlighted: false, + }; + }), + newPackages: [], + deletingPackages: [], + })); + }, + error: (err: unknown) => { + console.error("Failed to fetch PVEs:", err); + this.pves = []; + }, + }); + } + + addPackage(index: number): void { + const env = this.pves[index]; + env.newPackages.push({ name: "", version: "", operator: undefined, isHighlighted: false }); + } + + togglePackageDelete(index: number, pkg: PackageRow): void { + const env = this.pves[index]; + + pkg.isHighlighted = !pkg.isHighlighted; + + const version = pkg.version ?? ""; + + if (pkg.isHighlighted) { + const exists = env.deletingPackages.some(p => p.name === pkg.name && (p.version ?? "") === version); + if (!exists) { + env.deletingPackages.push({ name: pkg.name, version }); + } + } else { + env.deletingPackages = env.deletingPackages.filter(p => !(p.name === pkg.name && (p.version ?? "") === version)); + } + } + + scrollToBottomOfPipModal(index: number) { + setTimeout(() => { + const pre = document.getElementById(`pip-log-${index}`) as HTMLElement | null; + if (pre) { + pre.scrollTop = pre.scrollHeight; + } + }, 50); + } + + updatePrettyPipOutput(index: number) { + const env = this.pves[index]; + + const escapeHtml = (s: string) => + s + .replace(/&/g, "&") + .replace(/</g, "<") + .replace(/>/g, ">") + .replace(/"/g, """) + .replace(/'/g, "'"); + + const raw = env.pipOutput ?? ""; + const safe = escapeHtml(raw); + + env.prettyPipOutput = safe + .replace( + /^(\[pip\].*finished with exit code\s+0.*)$/gm, + "<span class=\"pip-exit ok\"><strong>$1</strong></span>" + ) + .replace( + /^(\[pip\].*finished with exit code\s+1.*)$/gm, + "<span class=\"pip-exit err\"><strong>$1</strong></span>" + ) + .replace( + /^(\[pip\].*finished with exit code\s+([2-9]\d*).*)$/gm, + "<span class=\"pip-exit err\"><strong>$1</strong></span>" + ) + .replace(/ERROR/g, "<span class=\"error\">ERROR</span>") + .replace(/WARNING/g, "<span class=\"warning\">WARNING</span>") + .replace(/already satisfied/g, "<span class=\"success\">already satisfied</span>") + .replace(/\n/g, "<br/>"); + } + + createVirtualEnvironment(index: number): void { + const cuId = this.selectedComputingUnit?.computingUnit.cuid; + const env = this.pves[index]; + + if (cuId == null) { + this.notificationService.error("No computing unit selected. Please select a CU first."); + return; + } + + if (!env.name?.trim()) { Review Comment: Same here. How to test the error? ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements OnInit { this.computingUnitStatusService.refreshComputingUnitList(); } } + + private makeEmptyPve(expanded: boolean): PveDraft { + return { + id: this.nextPveId++, + name: "", + userPackages: [], + newPackages: [{ name: "", operator: "==", version: "" }], + deletingPackages: [], + pipOutput: "", + prettyPipOutput: "", + expanded, + isInstalling: false, + }; + } + + addEnvironment(): void { + this.pves.push(this.makeEmptyPve(true)); + } + + trackByPveId(_index: number, pve: PveDraft): number { + return pve.id; + } + + showPVEmodalVisible(): void { + this.pveModalVisible = true; + this.getPVEs(); + } + + getPVEs(): void { + const cuId: number | undefined = this.selectedComputingUnit?.computingUnit.cuid; + + if (cuId == null) { + this.notificationService.error("No computing unit selected. Please select a CU first."); + return; + } + + this.workflowPveService.setCuid(cuId); + + this.workflowPveService + .fetchPVEs(cuId) + .pipe(untilDestroyed(this)) + .subscribe({ + next: (resp: PvePackageResponse[]) => { + this.pves = resp.map((pve, index) => ({ + id: index, + name: pve.pveName, + expanded: false, + isInstalling: false, + pipOutput: "", + prettyPipOutput: "", + userPackages: (pve.userPackages ?? []).map(pkg => { + const [name, version] = pkg.split("=="); + return { + name: name.trim(), + version: (version ?? "").trim(), + isHighlighted: false, + }; + }), + newPackages: [], + deletingPackages: [], + })); + }, + error: (err: unknown) => { + console.error("Failed to fetch PVEs:", err); + this.pves = []; + }, + }); + } + + addPackage(index: number): void { + const env = this.pves[index]; + env.newPackages.push({ name: "", version: "", operator: undefined, isHighlighted: false }); + } + + togglePackageDelete(index: number, pkg: PackageRow): void { + const env = this.pves[index]; + + pkg.isHighlighted = !pkg.isHighlighted; + + const version = pkg.version ?? ""; + + if (pkg.isHighlighted) { + const exists = env.deletingPackages.some(p => p.name === pkg.name && (p.version ?? "") === version); + if (!exists) { + env.deletingPackages.push({ name: pkg.name, version }); + } + } else { + env.deletingPackages = env.deletingPackages.filter(p => !(p.name === pkg.name && (p.version ?? "") === version)); + } + } + + scrollToBottomOfPipModal(index: number) { + setTimeout(() => { + const pre = document.getElementById(`pip-log-${index}`) as HTMLElement | null; + if (pre) { + pre.scrollTop = pre.scrollHeight; + } + }, 50); + } + + updatePrettyPipOutput(index: number) { Review Comment: Comment for this method. How you prettified the output. ########## frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts: ########## @@ -56,6 +56,27 @@ import { isComputingUnitShmTooLarge, getJvmMemorySliderConfig, } from "../../../common/util/computing-unit.util"; +import { PvePackageResponse, WorkflowPveService } from "../../service/virtual-environment/virtual-environment.service"; + +type PackageRow = { + name: string; + operator?: "==" | ">=" | "<="; + version?: string; + isHighlighted?: boolean; +}; + +type PveDraft = { + id: number; Review Comment: Do we need a PVE ID in this PR? ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.resource.pythonvirtualenvironment + +import com.fasterxml.jackson.databind.ObjectMapper +import org.glassfish.jersey.server.ChunkedOutput + +import java.util.concurrent.LinkedBlockingQueue +import javax.ws.rs._ +import javax.ws.rs.core.MediaType +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.jdk.CollectionConverters._ +import java.util + +case class PackageResponse(system: java.util.List[String], user: java.util.List[String]) + +@Path("/pve") +@Consumes(Array(MediaType.APPLICATION_JSON)) +class PveResource { + + // -------------------------------------------------- + // Create / Install packages (SSE) + // -------------------------------------------------- + @GET + @Produces(Array("text/event-stream")) + def createPve( + @QueryParam("packages") packagesJson: String, + @QueryParam("cuid") cuid: Int, + @QueryParam("pveName") pveName: String + ): ChunkedOutput[String] = { + + if (cuid == 0) { + throw new BadRequestException("Missing or invalid cuid") + } + + if (pveName == null || pveName.trim.isEmpty) { + throw new BadRequestException("Missing environment name") + } + + val mapper = new ObjectMapper() + val packages = + if (packagesJson == null || packagesJson.trim.isEmpty) + List.empty[String] + else + mapper.readValue(packagesJson, classOf[Array[String]]).toList + + val queue = new LinkedBlockingQueue[String]() + val chunkedOutput = new ChunkedOutput[String](classOf[String]) + + Future { + try { + + if (!PveManager.pveExists(cuid, pveName)) { + PveManager.createNewPve(cuid, queue, pveName) + } + + if (packages.nonEmpty) { + PveManager.installPackages(packages, cuid, queue, pveName) + } + + } catch { + case e: Exception => + queue.put(s"[ERR] ${e.getMessage}") + } finally { + queue.put("__DONE__") + } + } + + Future { + var done = false + while (!done) { + val line = queue.take() + if (line == "__DONE__") { + chunkedOutput.write("data: __DONE__\n\n") + done = true + } else chunkedOutput.write(s"data: $line\n\n") + } + chunkedOutput.close() + } + + chunkedOutput + } + + // -------------------------------------------------- + // Get installed packages + // -------------------------------------------------- + @GET + @Path("/packages") + @Produces(Array(MediaType.APPLICATION_JSON)) + def getInstalledPackages( + @QueryParam("cuid") cuid: Int, + @QueryParam("pveName") pveName: String + ): util.Map[String, util.List[String]] = { + + if (cuid == 0) { Review Comment: Is there a scenario where cuid can be 0? ########## amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala: ########## @@ -0,0 +1,544 @@ +package org.apache.texera.web.resource.pythonvirtualenvironment +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.{File, RandomAccessFile} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths, StandardOpenOption} +import java.util.concurrent.BlockingQueue +import scala.collection.mutable.Map +import scala.jdk.CollectionConverters._ +import scala.sys.process._ + +object PveManager { + + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") + + private def ensureDirExists(path: Path): Unit = { + if (!Files.exists(path)) Files.createDirectories(path) + } + + private def cuidDir(cuid: Int, pvename: String): Path = { + ensureDirExists(VenvRoot) + val cuIdDir = VenvRoot.resolve(cuid.toString) + ensureDirExists(cuIdDir) + + val dir = cuIdDir.resolve(pvename) + ensureDirExists(dir) + + dir + } + + private def pveDir(cuid: Int, pveName: String): Path = + cuidDir(cuid, pveName).resolve("pve") + + private def pythonBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("python") + + private def pipBinPath(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("bin").resolve("pip") + + private def metadataDir(cuid: Int, pveName: String): Path = + pveDir(cuid, pveName).resolve("metadata") + + private def systemPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("system-packages.txt") + + private def userPackagesPath(cuid: Int, pveName: String): Path = + metadataDir(cuid, pveName).resolve("user-packages.txt") + + private def ensureParentDir(path: Path): Unit = { + val parent = path.getParent + if (parent != null && !Files.exists(parent)) Files.createDirectories(parent) + } + + private def writeMetadata(path: Path, lines: Seq[String]): Unit = { + ensureParentDir(path) + Files.write( + path, + lines.asJava, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE + ) + } + + private def readMetadataList(path: Path): List[String] = { + if (!Files.exists(path)) return Nil + Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList + } + + private def parsePackageName(line: String): String = + line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase + + private def pipEnv: Map[String, String] = + Map( + "PYTHONUNBUFFERED" -> "1", + "PIP_PROGRESS_BAR" -> "off", + "PIP_DISABLE_PIP_VERSION_CHECK" -> "1", + "PIP_NO_INPUT" -> "1" + ) + + def pythonBin(cuid: Int, pveName: String): String = + pythonBinPath(cuid, pveName).toAbsolutePath.toString + + def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], Seq[String]) = { + val sys = readMetadataList(systemPackagesPath(cuid, pveName)) + val usr = readMetadataList(userPackagesPath(cuid, pveName)) + (sys, usr) + } + + def deletePackages(cuid: Int, packageName: String, pveName: String): List[String] = { + val pipPath = pipBinPath(cuid, pveName).toAbsolutePath + val userFile = userPackagesPath(cuid, pveName) + val systemFile = systemPackagesPath(cuid, pveName) + + val systemPackages: Set[String] = + readMetadataList(systemFile).map(parsePackageName).toSet + + val normalizedName = packageName.toLowerCase + if (systemPackages.contains(normalizedName)) { + return List(s"ERROR: '$packageName' is a system package and cannot be deleted.") + } + + if (!Files.exists(pipPath)) { + val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or is not initialized." + println(msg) + return List(msg) + } + + try { + val command = Seq(pipPath.toString, "uninstall", "-y", packageName) + + val logger = ProcessLogger( + (out: String) => println(s"[pip] $out"), + (err: String) => System.err.println(s"[pip][ERR] $err") + ) + + val exitCode = command.!(logger) + + if (exitCode == 0) { + val existing = readMetadataList(userFile) + val updated = + existing.filterNot(line => parsePackageName(line) == normalizedName).sorted + + writeMetadata(userFile, updated) + + List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully") + } else { + List(s"[PveManager] pip uninstall for '$packageName' failed with exit code $exitCode") + } + } catch { + case e: Exception => + List(s"[PveManager] Failed to delete package for cuid=$cuid: ${e.getMessage}") + } + } + + private def tailFileToQueue( Review Comment: Need comment about this method. Why do we need a separate thread? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
