kunwp1 commented on code in PR #4359:
URL: https://github.com/apache/texera/pull/4359#discussion_r3113252714


##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,544 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)
+    val cuIdDir = VenvRoot.resolve(cuid.toString)
+    ensureDirExists(cuIdDir)
+
+    val dir = cuIdDir.resolve(pvename)
+    ensureDirExists(dir)
+
+    dir
+  }
+
+  private def pveDir(cuid: Int, pveName: String): Path =
+    cuidDir(cuid, pveName).resolve("pve")
+
+  private def pythonBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("python")
+
+  private def pipBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("pip")
+
+  private def metadataDir(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("metadata")
+
+  private def systemPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("system-packages.txt")
+
+  private def userPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("user-packages.txt")
+
+  private def ensureParentDir(path: Path): Unit = {
+    val parent = path.getParent
+    if (parent != null && !Files.exists(parent)) 
Files.createDirectories(parent)
+  }
+
+  private def writeMetadata(path: Path, lines: Seq[String]): Unit = {
+    ensureParentDir(path)
+    Files.write(
+      path,
+      lines.asJava,
+      StandardOpenOption.CREATE,
+      StandardOpenOption.TRUNCATE_EXISTING,
+      StandardOpenOption.WRITE
+    )
+  }
+
+  private def readMetadataList(path: Path): List[String] = {
+    if (!Files.exists(path)) return Nil
+    Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList
+  }
+
+  private def parsePackageName(line: String): String =
+    line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase
+
+  private def pipEnv: Map[String, String] =
+    Map(
+      "PYTHONUNBUFFERED" -> "1",
+      "PIP_PROGRESS_BAR" -> "off",
+      "PIP_DISABLE_PIP_VERSION_CHECK" -> "1",
+      "PIP_NO_INPUT" -> "1"
+    )
+
+  def pythonBin(cuid: Int, pveName: String): String =
+    pythonBinPath(cuid, pveName).toAbsolutePath.toString
+
+  def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], 
Seq[String]) = {
+    val sys = readMetadataList(systemPackagesPath(cuid, pveName))
+    val usr = readMetadataList(userPackagesPath(cuid, pveName))
+    (sys, usr)
+  }
+
+  def deletePackages(cuid: Int, packageName: String, pveName: String): 
List[String] = {
+    val pipPath = pipBinPath(cuid, pveName).toAbsolutePath
+    val userFile = userPackagesPath(cuid, pveName)
+    val systemFile = systemPackagesPath(cuid, pveName)
+
+    val systemPackages: Set[String] =
+      readMetadataList(systemFile).map(parsePackageName).toSet
+
+    val normalizedName = packageName.toLowerCase
+    if (systemPackages.contains(normalizedName)) {
+      return List(s"ERROR: '$packageName' is a system package and cannot be 
deleted.")
+    }
+
+    if (!Files.exists(pipPath)) {
+      val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or 
is not initialized."
+      println(msg)
+      return List(msg)
+    }
+
+    try {
+      val command = Seq(pipPath.toString, "uninstall", "-y", packageName)
+
+      val logger = ProcessLogger(
+        (out: String) => println(s"[pip] $out"),
+        (err: String) => System.err.println(s"[pip][ERR] $err")
+      )
+
+      val exitCode = command.!(logger)
+
+      if (exitCode == 0) {
+        val existing = readMetadataList(userFile)
+        val updated =
+          existing.filterNot(line => parsePackageName(line) == 
normalizedName).sorted
+
+        writeMetadata(userFile, updated)
+
+        List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully")
+      } else {
+        List(s"[PveManager] pip uninstall for '$packageName' failed with exit 
code $exitCode")
+      }
+    } catch {
+      case e: Exception =>
+        List(s"[PveManager] Failed to delete package for cuid=$cuid: 
${e.getMessage}")
+    }
+  }
+
+  private def tailFileToQueue(
+      file: File,
+      queue: BlockingQueue[String],
+      prefix: String = "[pip] "
+  ): AutoCloseable = {
+    val raf = new RandomAccessFile(file, "r")
+    raf.seek(raf.length())
+    @volatile var running = true
+
+    val thread = new Thread(() => {
+      var buf = new StringBuilder
+      val charset = StandardCharsets.UTF_8
+      try {
+        while (running) {
+          val available = raf.length() - raf.getFilePointer
+          if (available > 0) {
+            val bytes = new Array[Byte](math.min(available, 8192).toInt)
+            val n = raf.read(bytes)
+            if (n > 0) {
+              buf.append(new String(bytes, 0, n, charset))
+              var newlineIndex = buf.indexOf("\n")
+              while (newlineIndex >= 0) {
+                val line = buf.substring(0, newlineIndex).trim
+                if (shouldStream(line)) queue.put(prefix + line)
+                buf = buf.delete(0, newlineIndex + 1)
+                newlineIndex = buf.indexOf("\n")
+              }
+            }
+          } else {
+            Thread.sleep(100)
+          }
+        }
+        val last = buf.result().trim
+        if (shouldStream(last)) queue.put(prefix + last)
+      } catch {
+        case _: InterruptedException => ()
+        case e: Exception            => queue.put(s"[pip][ERR] tail exception: 
${e.getMessage}")
+      } finally {
+        raf.close()
+      }
+    })
+
+    thread.setDaemon(true)
+    thread.start()
+
+    new AutoCloseable {
+      override def close(): Unit = {
+        running = false
+        thread.interrupt()
+        thread.join(500)
+      }
+    }
+  }
+
+  private def shouldStream(line: String): Boolean = {
+    val s = line.trim
+    if (s.isEmpty) return false
+
+    val lower = s.toLowerCase
+
+    if (lower.contains("found link")) return false
+    if (lower.contains("skipping link")) return false
+    if (lower.contains("cache")) return false
+    if (lower.contains("caching")) return false
+
+    true
+  }
+
+  private def runPipWithLog(
+      cmd: Seq[String],
+      env: Map[String, String],
+      queue: BlockingQueue[String]
+  ): Int = {
+    val logFile = File.createTempFile("pip-live-", ".log")
+    val fullCmd = if (cmd.contains("--log")) cmd else cmd ++ Seq("--log", 
logFile.getAbsolutePath)
+
+    val tailer = tailFileToQueue(logFile, queue)
+
+    val logger = ProcessLogger(
+      out => if (shouldStream(out)) queue.put(s"[pip/stdout] $out"),
+      err => if (shouldStream(err)) queue.put(s"[pip/stderr] $err")
+    )
+
+    val proc = Process(fullCmd, None, env.toSeq: _*).run(logger)
+    val exitCode = proc.exitValue()
+
+    try tailer.close()
+    catch { case _: Throwable => () }
+
+    queue.put(s"[pip] (log at ${logFile.getAbsolutePath})")
+    exitCode
+  }
+
+  def createNewPve(cuid: Int, queue: BlockingQueue[String], pveName: String): 
Unit = {
+    queue.put(s"[PVE] Creating new PVE for cuid=$cuid with name=$pveName")
+
+    val venvDirPath = pveDir(cuid, pveName).toAbsolutePath
+    ensureDirExists(cuidDir(cuid, pveName))
+
+    val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
+    val envVars = pipEnv
+
+    val Requirements: String =
+      """wheel==0.41.2

Review Comment:
   It's not a good idea to hardcode all the python packages because it's hard 
to sync with the latest requirements. 



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,544 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)

Review Comment:
   I think the VenvRoot and all the subdirectories except PVE related 
directories should be created when a computing unit is created. The lifecycle 
of those directories is not clear in this PR.



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########


Review Comment:
   Add some comments of what this manager is and also add comment for each 
method if the method is too long.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -56,6 +56,27 @@ import {
   isComputingUnitShmTooLarge,
   getJvmMemorySliderConfig,
 } from "../../../common/util/computing-unit.util";
+import { PvePackageResponse, WorkflowPveService } from 
"../../service/virtual-environment/virtual-environment.service";
+
+type PackageRow = {
+  name: string;
+  operator?: "==" | ">=" | "<=";
+  version?: string;
+  isHighlighted?: boolean;

Review Comment:
   Maybe rename it to `delete` or `deleteToggle`?



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,544 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)
+    val cuIdDir = VenvRoot.resolve(cuid.toString)
+    ensureDirExists(cuIdDir)
+
+    val dir = cuIdDir.resolve(pvename)
+    ensureDirExists(dir)
+
+    dir
+  }
+
+  private def pveDir(cuid: Int, pveName: String): Path =
+    cuidDir(cuid, pveName).resolve("pve")
+
+  private def pythonBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("python")
+
+  private def pipBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("pip")
+
+  private def metadataDir(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("metadata")
+
+  private def systemPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("system-packages.txt")
+
+  private def userPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("user-packages.txt")
+
+  private def ensureParentDir(path: Path): Unit = {

Review Comment:
   Consider removing `ensureParentDir` and `ensureDirExists`. I don't think we 
need to check whether the directory exists or not. 



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,544 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)
+    val cuIdDir = VenvRoot.resolve(cuid.toString)
+    ensureDirExists(cuIdDir)

Review Comment:
   You don't need to run createDirectories step-by-step. Can simply run a 
single createDirectories call.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       this.computingUnitStatusService.refreshComputingUnitList();
     }
   }
+
+  private makeEmptyPve(expanded: boolean): PveDraft {
+    return {
+      id: this.nextPveId++,
+      name: "",
+      userPackages: [],
+      newPackages: [{ name: "", operator: "==", version: "" }],
+      deletingPackages: [],
+      pipOutput: "",
+      prettyPipOutput: "",
+      expanded,
+      isInstalling: false,
+    };
+  }
+
+  addEnvironment(): void {
+    this.pves.push(this.makeEmptyPve(true));
+  }
+
+  trackByPveId(_index: number, pve: PveDraft): number {
+    return pve.id;
+  }
+
+  showPVEmodalVisible(): void {
+    this.pveModalVisible = true;
+    this.getPVEs();
+  }
+
+  getPVEs(): void {
+    const cuId: number | undefined = 
this.selectedComputingUnit?.computingUnit.cuid;
+
+    if (cuId == null) {
+      this.notificationService.error("No computing unit selected. Please 
select a CU first.");

Review Comment:
   I don't see a way to set cuId == null in the frontend. What's a test 
scenario to test this error?



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       this.computingUnitStatusService.refreshComputingUnitList();
     }
   }
+
+  private makeEmptyPve(expanded: boolean): PveDraft {
+    return {
+      id: this.nextPveId++,
+      name: "",
+      userPackages: [],
+      newPackages: [{ name: "", operator: "==", version: "" }],
+      deletingPackages: [],
+      pipOutput: "",
+      prettyPipOutput: "",
+      expanded,
+      isInstalling: false,
+    };
+  }
+
+  addEnvironment(): void {
+    this.pves.push(this.makeEmptyPve(true));
+  }
+
+  trackByPveId(_index: number, pve: PveDraft): number {
+    return pve.id;
+  }
+
+  showPVEmodalVisible(): void {
+    this.pveModalVisible = true;
+    this.getPVEs();
+  }
+
+  getPVEs(): void {
+    const cuId: number | undefined = 
this.selectedComputingUnit?.computingUnit.cuid;
+
+    if (cuId == null) {
+      this.notificationService.error("No computing unit selected. Please 
select a CU first.");
+      return;
+    }
+
+    this.workflowPveService.setCuid(cuId);
+
+    this.workflowPveService
+      .fetchPVEs(cuId)
+      .pipe(untilDestroyed(this))
+      .subscribe({
+      next: (resp: PvePackageResponse[]) => {
+        this.pves = resp.map((pve, index) => ({
+          id: index,
+          name: pve.pveName,
+          expanded: false,
+          isInstalling: false,
+          pipOutput: "",
+          prettyPipOutput: "",
+          userPackages: (pve.userPackages ?? []).map(pkg => {
+            const [name, version] = pkg.split("==");
+            return {
+              name: name.trim(),
+              version: (version ?? "").trim(),
+              isHighlighted: false,
+            };
+          }),
+          newPackages: [],
+          deletingPackages: [],
+        }));
+      },
+      error: (err: unknown) => {
+        console.error("Failed to fetch PVEs:", err);
+        this.pves = [];
+      },
+    });
+  }
+
+  addPackage(index: number): void {
+    const env = this.pves[index];
+    env.newPackages.push({ name: "", version: "", operator: undefined, 
isHighlighted: false });
+  }
+
+  togglePackageDelete(index: number, pkg: PackageRow): void {
+    const env = this.pves[index];
+
+    pkg.isHighlighted = !pkg.isHighlighted;
+
+    const version = pkg.version ?? "";
+
+    if (pkg.isHighlighted) {
+      const exists = env.deletingPackages.some(p => p.name === pkg.name && 
(p.version ?? "") === version);
+      if (!exists) {
+        env.deletingPackages.push({ name: pkg.name, version });
+      }
+    } else {
+      env.deletingPackages = env.deletingPackages.filter(p => !(p.name === 
pkg.name && (p.version ?? "") === version));
+    }
+  }
+
+  scrollToBottomOfPipModal(index: number) {
+    setTimeout(() => {
+      const pre = document.getElementById(`pip-log-${index}`) as HTMLElement | 
null;
+      if (pre) {
+        pre.scrollTop = pre.scrollHeight;
+      }
+    }, 50);
+  }
+
+  updatePrettyPipOutput(index: number) {
+    const env = this.pves[index];
+
+    const escapeHtml = (s: string) =>
+      s
+        .replace(/&/g, "&amp;")
+        .replace(/</g, "&lt;")
+        .replace(/>/g, "&gt;")
+        .replace(/"/g, "&quot;")
+        .replace(/'/g, "&#39;");
+
+    const raw = env.pipOutput ?? "";
+    const safe = escapeHtml(raw);
+
+    env.prettyPipOutput = safe
+      .replace(
+        /^(\[pip\].*finished with exit code\s+0.*)$/gm,
+        "<span class=\"pip-exit ok\"><strong>$1</strong></span>"
+      )
+      .replace(
+        /^(\[pip\].*finished with exit code\s+1.*)$/gm,
+        "<span class=\"pip-exit err\"><strong>$1</strong></span>"
+      )
+      .replace(
+        /^(\[pip\].*finished with exit code\s+([2-9]\d*).*)$/gm,
+        "<span class=\"pip-exit err\"><strong>$1</strong></span>"
+      )
+      .replace(/ERROR/g, "<span class=\"error\">ERROR</span>")
+      .replace(/WARNING/g, "<span class=\"warning\">WARNING</span>")
+      .replace(/already satisfied/g, "<span class=\"success\">already 
satisfied</span>")
+      .replace(/\n/g, "<br/>");
+  }
+
+  createVirtualEnvironment(index: number): void {
+    const cuId = this.selectedComputingUnit?.computingUnit.cuid;
+    const env = this.pves[index];
+
+    if (cuId == null) {

Review Comment:
   Same here.



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala:
##########


Review Comment:
   Check if there are any dead codes in this file.



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource.pythonvirtualenvironment
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.glassfish.jersey.server.ChunkedOutput
+
+import java.util.concurrent.LinkedBlockingQueue
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
+import java.util
+
+case class PackageResponse(system: java.util.List[String], user: 
java.util.List[String])
+
+@Path("/pve")
+@Consumes(Array(MediaType.APPLICATION_JSON))
+class PveResource {
+
+  // --------------------------------------------------
+  // Create / Install packages (SSE)
+  // --------------------------------------------------
+  @GET
+  @Produces(Array("text/event-stream"))
+  def createPve(
+      @QueryParam("packages") packagesJson: String,
+      @QueryParam("cuid") cuid: Int,
+      @QueryParam("pveName") pveName: String
+  ): ChunkedOutput[String] = {
+
+    if (cuid == 0) {
+      throw new BadRequestException("Missing or invalid cuid")
+    }
+
+    if (pveName == null || pveName.trim.isEmpty) {
+      throw new BadRequestException("Missing environment name")
+    }
+
+    val mapper = new ObjectMapper()
+    val packages =
+      if (packagesJson == null || packagesJson.trim.isEmpty)
+        List.empty[String]
+      else
+        mapper.readValue(packagesJson, classOf[Array[String]]).toList
+
+    val queue = new LinkedBlockingQueue[String]()
+    val chunkedOutput = new ChunkedOutput[String](classOf[String])
+
+    Future {
+      try {
+
+        if (!PveManager.pveExists(cuid, pveName)) {
+          PveManager.createNewPve(cuid, queue, pveName)
+        }
+
+        if (packages.nonEmpty) {
+          PveManager.installPackages(packages, cuid, queue, pveName)
+        }
+
+      } catch {
+        case e: Exception =>
+          queue.put(s"[ERR] ${e.getMessage}")
+      } finally {
+        queue.put("__DONE__")
+      }
+    }
+
+    Future {
+      var done = false
+      while (!done) {
+        val line = queue.take()
+        if (line == "__DONE__") {
+          chunkedOutput.write("data: __DONE__\n\n")
+          done = true
+        } else chunkedOutput.write(s"data: $line\n\n")
+      }
+      chunkedOutput.close()
+    }
+
+    chunkedOutput
+  }
+
+  // --------------------------------------------------
+  // Get installed packages
+  // --------------------------------------------------
+  @GET
+  @Path("/packages")
+  @Produces(Array(MediaType.APPLICATION_JSON))
+  def getInstalledPackages(
+      @QueryParam("cuid") cuid: Int,
+      @QueryParam("pveName") pveName: String
+  ): util.Map[String, util.List[String]] = {
+
+    if (cuid == 0) {
+      throw new BadRequestException("Missing or invalid cuid")
+    }
+
+    if (pveName == null || pveName.trim.isEmpty) {

Review Comment:
   Is there a scenario where pveName can be empty when the backend is 
executing? This might be a deadcode.



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,544 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)
+    val cuIdDir = VenvRoot.resolve(cuid.toString)
+    ensureDirExists(cuIdDir)
+
+    val dir = cuIdDir.resolve(pvename)
+    ensureDirExists(dir)
+
+    dir
+  }
+
+  private def pveDir(cuid: Int, pveName: String): Path =
+    cuidDir(cuid, pveName).resolve("pve")
+
+  private def pythonBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("python")
+
+  private def pipBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("pip")
+
+  private def metadataDir(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("metadata")
+
+  private def systemPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("system-packages.txt")
+
+  private def userPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("user-packages.txt")
+
+  private def ensureParentDir(path: Path): Unit = {
+    val parent = path.getParent
+    if (parent != null && !Files.exists(parent)) 
Files.createDirectories(parent)
+  }
+
+  private def writeMetadata(path: Path, lines: Seq[String]): Unit = {
+    ensureParentDir(path)
+    Files.write(
+      path,
+      lines.asJava,
+      StandardOpenOption.CREATE,
+      StandardOpenOption.TRUNCATE_EXISTING,
+      StandardOpenOption.WRITE
+    )
+  }
+
+  private def readMetadataList(path: Path): List[String] = {
+    if (!Files.exists(path)) return Nil
+    Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList
+  }
+
+  private def parsePackageName(line: String): String =
+    line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase
+
+  private def pipEnv: Map[String, String] =
+    Map(
+      "PYTHONUNBUFFERED" -> "1",
+      "PIP_PROGRESS_BAR" -> "off",
+      "PIP_DISABLE_PIP_VERSION_CHECK" -> "1",
+      "PIP_NO_INPUT" -> "1"
+    )
+
+  def pythonBin(cuid: Int, pveName: String): String =
+    pythonBinPath(cuid, pveName).toAbsolutePath.toString
+
+  def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], 
Seq[String]) = {
+    val sys = readMetadataList(systemPackagesPath(cuid, pveName))
+    val usr = readMetadataList(userPackagesPath(cuid, pveName))
+    (sys, usr)
+  }
+
+  def deletePackages(cuid: Int, packageName: String, pveName: String): 
List[String] = {
+    val pipPath = pipBinPath(cuid, pveName).toAbsolutePath
+    val userFile = userPackagesPath(cuid, pveName)
+    val systemFile = systemPackagesPath(cuid, pveName)
+
+    val systemPackages: Set[String] =
+      readMetadataList(systemFile).map(parsePackageName).toSet
+
+    val normalizedName = packageName.toLowerCase
+    if (systemPackages.contains(normalizedName)) {
+      return List(s"ERROR: '$packageName' is a system package and cannot be 
deleted.")
+    }
+
+    if (!Files.exists(pipPath)) {
+      val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or 
is not initialized."
+      println(msg)
+      return List(msg)
+    }
+
+    try {
+      val command = Seq(pipPath.toString, "uninstall", "-y", packageName)
+
+      val logger = ProcessLogger(
+        (out: String) => println(s"[pip] $out"),
+        (err: String) => System.err.println(s"[pip][ERR] $err")
+      )
+
+      val exitCode = command.!(logger)
+
+      if (exitCode == 0) {
+        val existing = readMetadataList(userFile)
+        val updated =
+          existing.filterNot(line => parsePackageName(line) == 
normalizedName).sorted
+
+        writeMetadata(userFile, updated)
+
+        List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully")
+      } else {
+        List(s"[PveManager] pip uninstall for '$packageName' failed with exit 
code $exitCode")
+      }
+    } catch {
+      case e: Exception =>
+        List(s"[PveManager] Failed to delete package for cuid=$cuid: 
${e.getMessage}")
+    }
+  }
+
+  private def tailFileToQueue(
+      file: File,
+      queue: BlockingQueue[String],
+      prefix: String = "[pip] "
+  ): AutoCloseable = {
+    val raf = new RandomAccessFile(file, "r")
+    raf.seek(raf.length())
+    @volatile var running = true
+
+    val thread = new Thread(() => {
+      var buf = new StringBuilder
+      val charset = StandardCharsets.UTF_8
+      try {
+        while (running) {
+          val available = raf.length() - raf.getFilePointer
+          if (available > 0) {
+            val bytes = new Array[Byte](math.min(available, 8192).toInt)
+            val n = raf.read(bytes)
+            if (n > 0) {
+              buf.append(new String(bytes, 0, n, charset))
+              var newlineIndex = buf.indexOf("\n")
+              while (newlineIndex >= 0) {
+                val line = buf.substring(0, newlineIndex).trim
+                if (shouldStream(line)) queue.put(prefix + line)
+                buf = buf.delete(0, newlineIndex + 1)
+                newlineIndex = buf.indexOf("\n")
+              }
+            }
+          } else {
+            Thread.sleep(100)
+          }
+        }
+        val last = buf.result().trim
+        if (shouldStream(last)) queue.put(prefix + last)
+      } catch {
+        case _: InterruptedException => ()
+        case e: Exception            => queue.put(s"[pip][ERR] tail exception: 
${e.getMessage}")
+      } finally {
+        raf.close()
+      }
+    })
+
+    thread.setDaemon(true)
+    thread.start()
+
+    new AutoCloseable {
+      override def close(): Unit = {
+        running = false
+        thread.interrupt()
+        thread.join(500)
+      }
+    }
+  }
+
+  private def shouldStream(line: String): Boolean = {
+    val s = line.trim
+    if (s.isEmpty) return false
+
+    val lower = s.toLowerCase
+
+    if (lower.contains("found link")) return false
+    if (lower.contains("skipping link")) return false
+    if (lower.contains("cache")) return false
+    if (lower.contains("caching")) return false
+
+    true
+  }
+
+  private def runPipWithLog(
+      cmd: Seq[String],
+      env: Map[String, String],
+      queue: BlockingQueue[String]
+  ): Int = {
+    val logFile = File.createTempFile("pip-live-", ".log")
+    val fullCmd = if (cmd.contains("--log")) cmd else cmd ++ Seq("--log", 
logFile.getAbsolutePath)
+
+    val tailer = tailFileToQueue(logFile, queue)
+
+    val logger = ProcessLogger(
+      out => if (shouldStream(out)) queue.put(s"[pip/stdout] $out"),
+      err => if (shouldStream(err)) queue.put(s"[pip/stderr] $err")
+    )
+
+    val proc = Process(fullCmd, None, env.toSeq: _*).run(logger)
+    val exitCode = proc.exitValue()
+
+    try tailer.close()
+    catch { case _: Throwable => () }
+
+    queue.put(s"[pip] (log at ${logFile.getAbsolutePath})")
+    exitCode
+  }
+
+  def createNewPve(cuid: Int, queue: BlockingQueue[String], pveName: String): 
Unit = {

Review Comment:
   Add comment of this method. It's too long to understand.



##########
amber/src/main/scala/org/apache/texera/web/TexeraWebApplication.scala:
##########
@@ -160,7 +161,7 @@ class TexeraWebApplication
     environment.jersey.register(classOf[UserQuotaResource])
     environment.jersey.register(classOf[AdminSettingsResource])
     environment.jersey.register(classOf[AIAssistantResource])
-

Review Comment:
   Does the creation of PVE involve Texera Web Application?



##########
bin/k8s/templates/gateway-routes.yaml:
##########
@@ -100,27 +101,47 @@ spec:
   parentRefs:
     - name: {{ .Release.Name }}-gateway
   {{- if and .Values.gatewayConfig .Values.gatewayConfig.hostname }}
-  hostnames:
-    - {{ .Values.gatewayConfig.hostname }}
+hostnames:
+  - {{ .Values.gatewayConfig.hostname }}
   {{- end }}
-  rules:
-    - matches:
-        - path:
-            type: PathPrefix
-            value: /wsapi
-        - path:
-            type: RegularExpression
-            value: "^/api/executions/\\d+/stats/\\d+$"
-        - path:
-            type: PathPrefix
-            value: /api/executions/result/export
-      backendRefs:
-        - group: gateway.envoyproxy.io
-          kind: Backend
-          name: texera-dynamic-backend
+rules:
+  - matches:
+      - path:
+          type: PathPrefix
+          value: /pve
+    timeouts:
+      request: 10m

Review Comment:
   Why do we need timeouts and why is it 10 minutes? 



##########
frontend/src/app/common/type/workflow-computing-unit.ts:
##########
@@ -50,6 +50,7 @@ export interface DashboardWorkflowComputingUnit {
   metrics: WorkflowComputingUnitMetrics;
   isOwner: boolean;
   accessPrivilege: "READ" | "WRITE" | "NONE";
+  packages: string[];

Review Comment:
   Do we need this change? If so, can you add a comment?



##########
bin/k8s/templates/gateway-routes.yaml:
##########


Review Comment:
   Please remove all the unnecessary diffs from this file. I see so many of 
them.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       this.computingUnitStatusService.refreshComputingUnitList();
     }
   }
+
+  private makeEmptyPve(expanded: boolean): PveDraft {
+    return {
+      id: this.nextPveId++,
+      name: "",
+      userPackages: [],
+      newPackages: [{ name: "", operator: "==", version: "" }],
+      deletingPackages: [],
+      pipOutput: "",
+      prettyPipOutput: "",
+      expanded,
+      isInstalling: false,
+    };
+  }
+
+  addEnvironment(): void {
+    this.pves.push(this.makeEmptyPve(true));
+  }
+
+  trackByPveId(_index: number, pve: PveDraft): number {

Review Comment:
   Remove this?



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource.pythonvirtualenvironment
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.glassfish.jersey.server.ChunkedOutput
+
+import java.util.concurrent.LinkedBlockingQueue
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
+import java.util
+
+case class PackageResponse(system: java.util.List[String], user: 
java.util.List[String])
+
+@Path("/pve")
+@Consumes(Array(MediaType.APPLICATION_JSON))
+class PveResource {
+
+  // --------------------------------------------------
+  // Create / Install packages (SSE)
+  // --------------------------------------------------
+  @GET
+  @Produces(Array("text/event-stream"))
+  def createPve(
+      @QueryParam("packages") packagesJson: String,
+      @QueryParam("cuid") cuid: Int,
+      @QueryParam("pveName") pveName: String
+  ): ChunkedOutput[String] = {
+
+    if (cuid == 0) {
+      throw new BadRequestException("Missing or invalid cuid")
+    }
+
+    if (pveName == null || pveName.trim.isEmpty) {
+      throw new BadRequestException("Missing environment name")
+    }
+
+    val mapper = new ObjectMapper()
+    val packages =
+      if (packagesJson == null || packagesJson.trim.isEmpty)
+        List.empty[String]
+      else
+        mapper.readValue(packagesJson, classOf[Array[String]]).toList
+
+    val queue = new LinkedBlockingQueue[String]()
+    val chunkedOutput = new ChunkedOutput[String](classOf[String])
+
+    Future {
+      try {
+
+        if (!PveManager.pveExists(cuid, pveName)) {
+          PveManager.createNewPve(cuid, queue, pveName)
+        }
+
+        if (packages.nonEmpty) {
+          PveManager.installPackages(packages, cuid, queue, pveName)
+        }
+
+      } catch {
+        case e: Exception =>
+          queue.put(s"[ERR] ${e.getMessage}")
+      } finally {
+        queue.put("__DONE__")
+      }
+    }
+
+    Future {
+      var done = false
+      while (!done) {
+        val line = queue.take()
+        if (line == "__DONE__") {
+          chunkedOutput.write("data: __DONE__\n\n")
+          done = true
+        } else chunkedOutput.write(s"data: $line\n\n")
+      }
+      chunkedOutput.close()
+    }
+
+    chunkedOutput
+  }
+
+  // --------------------------------------------------
+  // Get installed packages
+  // --------------------------------------------------
+  @GET
+  @Path("/packages")
+  @Produces(Array(MediaType.APPLICATION_JSON))
+  def getInstalledPackages(
+      @QueryParam("cuid") cuid: Int,
+      @QueryParam("pveName") pveName: String
+  ): util.Map[String, util.List[String]] = {
+
+    if (cuid == 0) {
+      throw new BadRequestException("Missing or invalid cuid")
+    }
+
+    if (pveName == null || pveName.trim.isEmpty) {
+      throw new BadRequestException("Missing environment name")
+    }
+
+    try {
+
+      println(s"[PVE] HIT getInstalledPackages cuid=$cuid")
+
+      val (systemPkgsRaw, userPkgsRaw) = 
PveManager.getSystemAndUserPackages(cuid, pveName)
+
+      println(s"[PVE] raw systemPkgsRaw isNull=${systemPkgsRaw == null} 
value=$systemPkgsRaw")
+      println(s"[PVE] raw userPkgsRaw   isNull=${userPkgsRaw == null} 
value=$userPkgsRaw")
+
+      val systemPkgs = 
Option(systemPkgsRaw).getOrElse(Seq.empty[String]).toList.asJava
+      val userPkgs = 
Option(userPkgsRaw).getOrElse(Seq.empty[String]).toList.asJava
+
+      val resp = Map("system" -> systemPkgs, "user" -> userPkgs).asJava
+
+      println(
+        s"[PVE] returning keys=${resp.keySet()} 
systemSize=${systemPkgs.size()} userSize=${userPkgs.size()}"
+      )
+
+      resp
+
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        Map(
+          "system" -> List(s"Error: ${e.getMessage}").asJava,
+          "user" -> List.empty[String].asJava
+        ).asJava
+    }
+  }
+
+  // --------------------------------------------------
+  // Fetch PVEs
+  // --------------------------------------------------
+  @GET
+  @Path("/pves")
+  @Produces(Array(MediaType.APPLICATION_JSON))
+  def fetchPVEs(@QueryParam("cuid") cuid: Int): util.List[util.Map[String, 
Object]] = {
+
+    if (cuid == 0) {

Review Comment:
   Is there a scenario where cuid can be 0?



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       this.computingUnitStatusService.refreshComputingUnitList();
     }
   }
+
+  private makeEmptyPve(expanded: boolean): PveDraft {
+    return {
+      id: this.nextPveId++,
+      name: "",
+      userPackages: [],
+      newPackages: [{ name: "", operator: "==", version: "" }],
+      deletingPackages: [],
+      pipOutput: "",
+      prettyPipOutput: "",
+      expanded,
+      isInstalling: false,
+    };
+  }
+
+  addEnvironment(): void {
+    this.pves.push(this.makeEmptyPve(true));
+  }
+
+  trackByPveId(_index: number, pve: PveDraft): number {
+    return pve.id;
+  }
+
+  showPVEmodalVisible(): void {
+    this.pveModalVisible = true;
+    this.getPVEs();
+  }
+
+  getPVEs(): void {
+    const cuId: number | undefined = 
this.selectedComputingUnit?.computingUnit.cuid;
+
+    if (cuId == null) {
+      this.notificationService.error("No computing unit selected. Please 
select a CU first.");
+      return;
+    }
+
+    this.workflowPveService.setCuid(cuId);
+
+    this.workflowPveService
+      .fetchPVEs(cuId)
+      .pipe(untilDestroyed(this))
+      .subscribe({
+      next: (resp: PvePackageResponse[]) => {
+        this.pves = resp.map((pve, index) => ({
+          id: index,
+          name: pve.pveName,
+          expanded: false,
+          isInstalling: false,
+          pipOutput: "",
+          prettyPipOutput: "",
+          userPackages: (pve.userPackages ?? []).map(pkg => {
+            const [name, version] = pkg.split("==");
+            return {
+              name: name.trim(),
+              version: (version ?? "").trim(),
+              isHighlighted: false,
+            };
+          }),
+          newPackages: [],
+          deletingPackages: [],
+        }));
+      },
+      error: (err: unknown) => {
+        console.error("Failed to fetch PVEs:", err);
+        this.pves = [];
+      },
+    });
+  }
+
+  addPackage(index: number): void {
+    const env = this.pves[index];
+    env.newPackages.push({ name: "", version: "", operator: undefined, 
isHighlighted: false });
+  }
+
+  togglePackageDelete(index: number, pkg: PackageRow): void {
+    const env = this.pves[index];
+
+    pkg.isHighlighted = !pkg.isHighlighted;
+
+    const version = pkg.version ?? "";
+
+    if (pkg.isHighlighted) {
+      const exists = env.deletingPackages.some(p => p.name === pkg.name && 
(p.version ?? "") === version);
+      if (!exists) {
+        env.deletingPackages.push({ name: pkg.name, version });
+      }
+    } else {
+      env.deletingPackages = env.deletingPackages.filter(p => !(p.name === 
pkg.name && (p.version ?? "") === version));
+    }
+  }
+
+  scrollToBottomOfPipModal(index: number) {
+    setTimeout(() => {
+      const pre = document.getElementById(`pip-log-${index}`) as HTMLElement | 
null;
+      if (pre) {
+        pre.scrollTop = pre.scrollHeight;
+      }
+    }, 50);
+  }
+
+  updatePrettyPipOutput(index: number) {
+    const env = this.pves[index];
+
+    const escapeHtml = (s: string) =>
+      s
+        .replace(/&/g, "&amp;")
+        .replace(/</g, "&lt;")
+        .replace(/>/g, "&gt;")
+        .replace(/"/g, "&quot;")
+        .replace(/'/g, "&#39;");
+
+    const raw = env.pipOutput ?? "";
+    const safe = escapeHtml(raw);
+
+    env.prettyPipOutput = safe
+      .replace(
+        /^(\[pip\].*finished with exit code\s+0.*)$/gm,
+        "<span class=\"pip-exit ok\"><strong>$1</strong></span>"
+      )
+      .replace(
+        /^(\[pip\].*finished with exit code\s+1.*)$/gm,
+        "<span class=\"pip-exit err\"><strong>$1</strong></span>"
+      )
+      .replace(
+        /^(\[pip\].*finished with exit code\s+([2-9]\d*).*)$/gm,
+        "<span class=\"pip-exit err\"><strong>$1</strong></span>"
+      )
+      .replace(/ERROR/g, "<span class=\"error\">ERROR</span>")
+      .replace(/WARNING/g, "<span class=\"warning\">WARNING</span>")
+      .replace(/already satisfied/g, "<span class=\"success\">already 
satisfied</span>")
+      .replace(/\n/g, "<br/>");
+  }
+
+  createVirtualEnvironment(index: number): void {
+    const cuId = this.selectedComputingUnit?.computingUnit.cuid;
+    const env = this.pves[index];
+
+    if (cuId == null) {
+      this.notificationService.error("No computing unit selected. Please 
select a CU first.");
+      return;
+    }
+
+    if (!env.name?.trim()) {

Review Comment:
   Same here. How to test the error?



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -637,4 +670,281 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       this.computingUnitStatusService.refreshComputingUnitList();
     }
   }
+
+  private makeEmptyPve(expanded: boolean): PveDraft {
+    return {
+      id: this.nextPveId++,
+      name: "",
+      userPackages: [],
+      newPackages: [{ name: "", operator: "==", version: "" }],
+      deletingPackages: [],
+      pipOutput: "",
+      prettyPipOutput: "",
+      expanded,
+      isInstalling: false,
+    };
+  }
+
+  addEnvironment(): void {
+    this.pves.push(this.makeEmptyPve(true));
+  }
+
+  trackByPveId(_index: number, pve: PveDraft): number {
+    return pve.id;
+  }
+
+  showPVEmodalVisible(): void {
+    this.pveModalVisible = true;
+    this.getPVEs();
+  }
+
+  getPVEs(): void {
+    const cuId: number | undefined = 
this.selectedComputingUnit?.computingUnit.cuid;
+
+    if (cuId == null) {
+      this.notificationService.error("No computing unit selected. Please 
select a CU first.");
+      return;
+    }
+
+    this.workflowPveService.setCuid(cuId);
+
+    this.workflowPveService
+      .fetchPVEs(cuId)
+      .pipe(untilDestroyed(this))
+      .subscribe({
+      next: (resp: PvePackageResponse[]) => {
+        this.pves = resp.map((pve, index) => ({
+          id: index,
+          name: pve.pveName,
+          expanded: false,
+          isInstalling: false,
+          pipOutput: "",
+          prettyPipOutput: "",
+          userPackages: (pve.userPackages ?? []).map(pkg => {
+            const [name, version] = pkg.split("==");
+            return {
+              name: name.trim(),
+              version: (version ?? "").trim(),
+              isHighlighted: false,
+            };
+          }),
+          newPackages: [],
+          deletingPackages: [],
+        }));
+      },
+      error: (err: unknown) => {
+        console.error("Failed to fetch PVEs:", err);
+        this.pves = [];
+      },
+    });
+  }
+
+  addPackage(index: number): void {
+    const env = this.pves[index];
+    env.newPackages.push({ name: "", version: "", operator: undefined, 
isHighlighted: false });
+  }
+
+  togglePackageDelete(index: number, pkg: PackageRow): void {
+    const env = this.pves[index];
+
+    pkg.isHighlighted = !pkg.isHighlighted;
+
+    const version = pkg.version ?? "";
+
+    if (pkg.isHighlighted) {
+      const exists = env.deletingPackages.some(p => p.name === pkg.name && 
(p.version ?? "") === version);
+      if (!exists) {
+        env.deletingPackages.push({ name: pkg.name, version });
+      }
+    } else {
+      env.deletingPackages = env.deletingPackages.filter(p => !(p.name === 
pkg.name && (p.version ?? "") === version));
+    }
+  }
+
+  scrollToBottomOfPipModal(index: number) {
+    setTimeout(() => {
+      const pre = document.getElementById(`pip-log-${index}`) as HTMLElement | 
null;
+      if (pre) {
+        pre.scrollTop = pre.scrollHeight;
+      }
+    }, 50);
+  }
+
+  updatePrettyPipOutput(index: number) {

Review Comment:
   Comment for this method. How you prettified the output.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -56,6 +56,27 @@ import {
   isComputingUnitShmTooLarge,
   getJvmMemorySliderConfig,
 } from "../../../common/util/computing-unit.util";
+import { PvePackageResponse, WorkflowPveService } from 
"../../service/virtual-environment/virtual-environment.service";
+
+type PackageRow = {
+  name: string;
+  operator?: "==" | ">=" | "<=";
+  version?: string;
+  isHighlighted?: boolean;
+};
+
+type PveDraft = {
+  id: number;

Review Comment:
   Do we need a PVE ID in this PR?



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.resource.pythonvirtualenvironment
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.glassfish.jersey.server.ChunkedOutput
+
+import java.util.concurrent.LinkedBlockingQueue
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
+import java.util
+
+case class PackageResponse(system: java.util.List[String], user: 
java.util.List[String])
+
+@Path("/pve")
+@Consumes(Array(MediaType.APPLICATION_JSON))
+class PveResource {
+
+  // --------------------------------------------------
+  // Create / Install packages (SSE)
+  // --------------------------------------------------
+  @GET
+  @Produces(Array("text/event-stream"))
+  def createPve(
+      @QueryParam("packages") packagesJson: String,
+      @QueryParam("cuid") cuid: Int,
+      @QueryParam("pveName") pveName: String
+  ): ChunkedOutput[String] = {
+
+    if (cuid == 0) {
+      throw new BadRequestException("Missing or invalid cuid")
+    }
+
+    if (pveName == null || pveName.trim.isEmpty) {
+      throw new BadRequestException("Missing environment name")
+    }
+
+    val mapper = new ObjectMapper()
+    val packages =
+      if (packagesJson == null || packagesJson.trim.isEmpty)
+        List.empty[String]
+      else
+        mapper.readValue(packagesJson, classOf[Array[String]]).toList
+
+    val queue = new LinkedBlockingQueue[String]()
+    val chunkedOutput = new ChunkedOutput[String](classOf[String])
+
+    Future {
+      try {
+
+        if (!PveManager.pveExists(cuid, pveName)) {
+          PveManager.createNewPve(cuid, queue, pveName)
+        }
+
+        if (packages.nonEmpty) {
+          PveManager.installPackages(packages, cuid, queue, pveName)
+        }
+
+      } catch {
+        case e: Exception =>
+          queue.put(s"[ERR] ${e.getMessage}")
+      } finally {
+        queue.put("__DONE__")
+      }
+    }
+
+    Future {
+      var done = false
+      while (!done) {
+        val line = queue.take()
+        if (line == "__DONE__") {
+          chunkedOutput.write("data: __DONE__\n\n")
+          done = true
+        } else chunkedOutput.write(s"data: $line\n\n")
+      }
+      chunkedOutput.close()
+    }
+
+    chunkedOutput
+  }
+
+  // --------------------------------------------------
+  // Get installed packages
+  // --------------------------------------------------
+  @GET
+  @Path("/packages")
+  @Produces(Array(MediaType.APPLICATION_JSON))
+  def getInstalledPackages(
+      @QueryParam("cuid") cuid: Int,
+      @QueryParam("pveName") pveName: String
+  ): util.Map[String, util.List[String]] = {
+
+    if (cuid == 0) {

Review Comment:
   Is there a scenario where cuid can be 0?



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,544 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)
+    val cuIdDir = VenvRoot.resolve(cuid.toString)
+    ensureDirExists(cuIdDir)
+
+    val dir = cuIdDir.resolve(pvename)
+    ensureDirExists(dir)
+
+    dir
+  }
+
+  private def pveDir(cuid: Int, pveName: String): Path =
+    cuidDir(cuid, pveName).resolve("pve")
+
+  private def pythonBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("python")
+
+  private def pipBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("pip")
+
+  private def metadataDir(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("metadata")
+
+  private def systemPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("system-packages.txt")
+
+  private def userPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("user-packages.txt")
+
+  private def ensureParentDir(path: Path): Unit = {
+    val parent = path.getParent
+    if (parent != null && !Files.exists(parent)) 
Files.createDirectories(parent)
+  }
+
+  private def writeMetadata(path: Path, lines: Seq[String]): Unit = {
+    ensureParentDir(path)
+    Files.write(
+      path,
+      lines.asJava,
+      StandardOpenOption.CREATE,
+      StandardOpenOption.TRUNCATE_EXISTING,
+      StandardOpenOption.WRITE
+    )
+  }
+
+  private def readMetadataList(path: Path): List[String] = {
+    if (!Files.exists(path)) return Nil
+    Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList
+  }
+
+  private def parsePackageName(line: String): String =
+    line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase
+
+  private def pipEnv: Map[String, String] =
+    Map(
+      "PYTHONUNBUFFERED" -> "1",
+      "PIP_PROGRESS_BAR" -> "off",
+      "PIP_DISABLE_PIP_VERSION_CHECK" -> "1",
+      "PIP_NO_INPUT" -> "1"
+    )
+
+  def pythonBin(cuid: Int, pveName: String): String =
+    pythonBinPath(cuid, pveName).toAbsolutePath.toString
+
+  def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], 
Seq[String]) = {
+    val sys = readMetadataList(systemPackagesPath(cuid, pveName))
+    val usr = readMetadataList(userPackagesPath(cuid, pveName))
+    (sys, usr)
+  }
+
+  def deletePackages(cuid: Int, packageName: String, pveName: String): 
List[String] = {
+    val pipPath = pipBinPath(cuid, pveName).toAbsolutePath
+    val userFile = userPackagesPath(cuid, pveName)
+    val systemFile = systemPackagesPath(cuid, pveName)
+
+    val systemPackages: Set[String] =
+      readMetadataList(systemFile).map(parsePackageName).toSet
+
+    val normalizedName = packageName.toLowerCase
+    if (systemPackages.contains(normalizedName)) {
+      return List(s"ERROR: '$packageName' is a system package and cannot be 
deleted.")
+    }
+
+    if (!Files.exists(pipPath)) {
+      val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or 
is not initialized."
+      println(msg)
+      return List(msg)
+    }
+
+    try {
+      val command = Seq(pipPath.toString, "uninstall", "-y", packageName)
+
+      val logger = ProcessLogger(
+        (out: String) => println(s"[pip] $out"),
+        (err: String) => System.err.println(s"[pip][ERR] $err")
+      )
+
+      val exitCode = command.!(logger)
+
+      if (exitCode == 0) {
+        val existing = readMetadataList(userFile)
+        val updated =
+          existing.filterNot(line => parsePackageName(line) == 
normalizedName).sorted
+
+        writeMetadata(userFile, updated)
+
+        List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully")
+      } else {
+        List(s"[PveManager] pip uninstall for '$packageName' failed with exit 
code $exitCode")
+      }
+    } catch {
+      case e: Exception =>
+        List(s"[PveManager] Failed to delete package for cuid=$cuid: 
${e.getMessage}")
+    }
+  }
+
+  private def tailFileToQueue(

Review Comment:
   Need comment about this method. Why do we need a separate thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to