kunwp1 commented on code in PR #4359:
URL: https://github.com/apache/texera/pull/4359#discussion_r3096023218


##########
access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala:
##########
@@ -43,6 +43,8 @@ object AccessControlResource extends LazyLogging {
   private val wsapiWorkflowWebsocket: Regex = 
""".*/wsapi/workflow-websocket.*""".r
   private val apiExecutionsStats: Regex = 
""".*/api/executions/[0-9]+/stats/[0-9]+.*""".r
   private val apiExecutionsResultExport: Regex = 
""".*/api/executions/result/export.*""".r
+  private val authPveRoot: Regex = """auth/pve/?""".r

Review Comment:
   Better to combine these two regex into a single regex.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -61,6 +82,13 @@ import {
   styleUrls: ["./computing-unit-selection.component.scss"],
 })
 export class ComputingUnitSelectionComponent implements OnInit {
+  // variables for creating a virtual environment
+  pves: PveDraft[] = [this.makeEmptyPve(true)];
+  systemPackages: { name: string; version: string }[] = [];
+  pipModalCloseHandler: (() => void) | null = null;
+  PVEmodalVisible = false;

Review Comment:
   lowercase the PVE and capitalize M to make the style consistent with others.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -160,14 +189,18 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       .subscribe(unit => {
         const wid = this.workflowActionService.getWorkflowMetadata()?.wid;
 
-        // ── compare with the *previous* cuid, not the one we are just about 
to store ──
+        // compare with the previous cuid, not the one we are just about to 
store
         if (isDefined(wid) && unit?.computingUnit.cuid !== 
this.lastSelectedCuid) {
           this.updateWorkflowModificationStatus(wid);
         }
 
-        // update local caches **after** the comparison
+        // update local caches after the comparison

Review Comment:
   Revert this unnecessary change.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html:
##########
@@ -134,7 +134,7 @@
               <span
                 *ngIf="unit.status === 'Pending'"
                 class="unit-status-indicator"
-                >(Connecting)</span
+              >(Connecting)</span

Review Comment:
   Revert this change.



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala:
##########
@@ -0,0 +1,661 @@
+package org.apache.texera.web.resource.pythonvirtualenvironment
+
+import java.io.{File, RandomAccessFile}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Path, Paths, StandardOpenOption}
+import java.util.concurrent.BlockingQueue
+import scala.collection.mutable.Map
+import scala.jdk.CollectionConverters._
+import scala.sys.process._
+
+object PveManager {
+
+  private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")
+
+  private def ensureDirExists(path: Path): Unit = {
+    if (!Files.exists(path)) Files.createDirectories(path)
+  }
+
+  private def cuidDir(cuid: Int, pvename: String): Path = {
+    ensureDirExists(VenvRoot)
+    val cuIdDir = VenvRoot.resolve(cuid.toString)
+    ensureDirExists(cuIdDir)
+
+    val dir = cuIdDir.resolve(pvename)
+    ensureDirExists(dir)
+
+    dir
+  }
+
+  private def pveDir(cuid: Int, pveName: String): Path =
+    cuidDir(cuid, pveName).resolve("pve")
+
+  private def pythonBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("python")
+
+  private def pipBinPath(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("bin").resolve("pip")
+
+  private def metadataDir(cuid: Int, pveName: String): Path =
+    pveDir(cuid, pveName).resolve("metadata")
+
+  private def systemPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("system-packages.txt")
+
+  private def userPackagesPath(cuid: Int, pveName: String): Path =
+    metadataDir(cuid, pveName).resolve("user-packages.txt")
+
+  private def ensureParentDir(path: Path): Unit = {
+    val parent = path.getParent
+    if (parent != null && !Files.exists(parent)) 
Files.createDirectories(parent)
+  }
+
+  private def writeMetadata(path: Path, lines: Seq[String]): Unit = {
+    ensureParentDir(path)
+    Files.write(
+      path,
+      lines.asJava,
+      StandardOpenOption.CREATE,
+      StandardOpenOption.TRUNCATE_EXISTING,
+      StandardOpenOption.WRITE
+    )
+  }
+
+  private def readMetadataList(path: Path): List[String] = {
+    if (!Files.exists(path)) return Nil
+    Files.readAllLines(path).asScala.map(_.trim).filter(_.nonEmpty).toList
+  }
+
+  private def parsePackageName(line: String): String =
+    line.split("==", 2).headOption.getOrElse(line).trim.toLowerCase
+
+  private def pipEnv: Map[String, String] =
+    Map(
+      "PYTHONUNBUFFERED" -> "1",
+      "PIP_PROGRESS_BAR" -> "off",
+      "PIP_DISABLE_PIP_VERSION_CHECK" -> "1",
+      "PIP_NO_INPUT" -> "1"
+    )
+
+  def pythonBin(cuid: Int, pveName: String): String =
+    pythonBinPath(cuid, pveName).toAbsolutePath.toString
+
+  /** Return (systemPkgs, userPkgs) as plain strings from metadata. */
+  def getSystemAndUserPackages(cuid: Int, pveName: String): (Seq[String], 
Seq[String]) = {
+    val sys = readMetadataList(systemPackagesPath(cuid, pveName))
+    val usr = readMetadataList(userPackagesPath(cuid, pveName))
+    (sys, usr)
+  }
+
+  def deletePackages(cuid: Int, packageName: String, pveName: String): 
List[String] = {
+    val pipPath = pipBinPath(cuid, pveName).toAbsolutePath
+    val userFile = userPackagesPath(cuid, pveName)
+    val systemFile = systemPackagesPath(cuid, pveName)
+
+    val systemPackages: Set[String] =
+      readMetadataList(systemFile).map(parsePackageName).toSet
+
+    val normalizedName = packageName.toLowerCase
+    if (systemPackages.contains(normalizedName)) {
+      return List(s"ERROR: '$packageName' is a system package and cannot be 
deleted.")
+    }
+
+    if (!Files.exists(pipPath)) {
+      val msg = s"[PveManager] No pip found at $pipPath — PVE may not exist or 
is not initialized."
+      println(msg)
+      return List(msg)
+    }
+
+    try {
+      val command = Seq(pipPath.toString, "uninstall", "-y", packageName)
+
+      val logger = ProcessLogger(
+        (out: String) => println(s"[pip] $out"),
+        (err: String) => System.err.println(s"[pip][ERR] $err")
+      )
+
+      val exitCode = command.!(logger)
+
+      if (exitCode == 0) {
+        val existing = readMetadataList(userFile)
+        val updated =
+          existing.filterNot(line => parsePackageName(line) == 
normalizedName).sorted
+
+        writeMetadata(userFile, updated)
+
+        List(s"Exit code: $exitCode", s"Uninstalled $packageName successfully")
+      } else {
+        List(s"[PveManager] pip uninstall for '$packageName' failed with exit 
code $exitCode")
+      }
+    } catch {
+      case e: Exception =>
+        List(s"[PveManager] Failed to delete package for cuid=$cuid: 
${e.getMessage}")
+    }
+  }
+
+  private def tailFileToQueue(
+      file: File,
+      queue: BlockingQueue[String],
+      prefix: String = "[pip] "
+  ): AutoCloseable = {
+    val raf = new RandomAccessFile(file, "r")
+    raf.seek(raf.length())
+    @volatile var running = true
+
+    val thread = new Thread(() => {
+      var buf = new StringBuilder
+      val charset = StandardCharsets.UTF_8
+      try {
+        while (running) {
+          val available = raf.length() - raf.getFilePointer
+          if (available > 0) {
+            val bytes = new Array[Byte](math.min(available, 8192).toInt)
+            val n = raf.read(bytes)
+            if (n > 0) {
+              buf.append(new String(bytes, 0, n, charset))
+              var newlineIndex = buf.indexOf("\n")
+              while (newlineIndex >= 0) {
+                val line = buf.substring(0, newlineIndex).trim
+                if (shouldStream(line)) queue.put(prefix + line)
+                buf = buf.delete(0, newlineIndex + 1)
+                newlineIndex = buf.indexOf("\n")
+              }
+            }
+          } else {
+            Thread.sleep(100)
+          }
+        }
+        val last = buf.result().trim
+        if (shouldStream(last)) queue.put(prefix + last)
+      } catch {
+        case _: InterruptedException => ()
+        case e: Exception            => queue.put(s"[pip][ERR] tail exception: 
${e.getMessage}")
+      } finally {
+        raf.close()
+      }
+    })
+
+    thread.setDaemon(true)
+    thread.start()
+
+    new AutoCloseable {
+      override def close(): Unit = {
+        running = false
+        thread.interrupt()
+        thread.join(500)
+      }
+    }
+  }
+
+  private def shouldStream(line: String): Boolean = {
+    val s = line.trim
+    if (s.isEmpty) return false
+
+    val lower = s.toLowerCase
+
+    if (lower.contains("found link")) return false
+    if (lower.contains("skipping link")) return false
+    if (lower.contains("cache")) return false
+    if (lower.contains("caching")) return false
+
+    true
+  }
+
+  private def runPipWithLog(
+      cmd: Seq[String],
+      env: Map[String, String],
+      queue: BlockingQueue[String]
+  ): Int = {
+    val logFile = File.createTempFile("pip-live-", ".log")
+    val fullCmd = if (cmd.contains("--log")) cmd else cmd ++ Seq("--log", 
logFile.getAbsolutePath)
+
+    val tailer = tailFileToQueue(logFile, queue)
+
+    val logger = ProcessLogger(
+      out => if (shouldStream(out)) queue.put(s"[pip/stdout] $out"),
+      err => if (shouldStream(err)) queue.put(s"[pip/stderr] $err")
+    )
+
+    val proc = Process(fullCmd, None, env.toSeq: _*).run(logger)
+    val exitCode = proc.exitValue()
+
+    try tailer.close()
+    catch { case _: Throwable => () }
+
+    queue.put(s"[pip] (log at ${logFile.getAbsolutePath})")
+    exitCode
+  }
+
+//  def createNewPve(cuid: Int, queue: BlockingQueue[String], pveName: 
String): Unit = {

Review Comment:
   Remove these unnecessary comments. 



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -606,7 +639,7 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
     // Get the current memory in GB
     const memoryValue = parseResourceNumber(this.selectedMemory);
     const memoryUnit = parseResourceUnit(this.selectedMemory);
-    let cuMemoryInGb = memoryUnit === "Gi" ? memoryValue : memoryUnit === "Mi" 
? Math.floor(memoryValue / 1024) : 1;
+    const cuMemoryInGb = memoryUnit === "Gi" ? memoryValue : memoryUnit === 
"Mi" ? Math.floor(memoryValue / 1024) : 1;

Review Comment:
   Revert this change.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -160,14 +189,18 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
       .subscribe(unit => {
         const wid = this.workflowActionService.getWorkflowMetadata()?.wid;
 
-        // ── compare with the *previous* cuid, not the one we are just about 
to store ──
+        // compare with the previous cuid, not the one we are just about to 
store

Review Comment:
   Revert this unnecessary change.



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html:
##########
@@ -90,7 +90,7 @@
       <span
         *ngIf="!selectedComputingUnit"
         class="connect-text"
-        >Connect</span
+      >Connect</span

Review Comment:
   Revert this change.



##########
amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/WsPveResource.scala:
##########


Review Comment:
   What's the purpose of separating WsPveResource from PveResource? Why not 
combine them?



##########
frontend/src/app/workspace/component/power-button/computing-unit-selection.component.ts:
##########
@@ -227,7 +260,7 @@ export class ComputingUnitSelectionComponent implements 
OnInit {
                 next: (latestWorkflowExecution: WorkflowExecutionsEntry) => {
                   this.selectComputingUnit(this.workflowId, 
latestWorkflowExecution.cuId);
                 },
-                error: (err: unknown) => {
+                error: (_err: unknown) => {

Review Comment:
   Revert this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to