JorringHsiao opened a new issue, #17788:
URL: https://github.com/apache/dolphinscheduler/issues/17788

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and 
found no similar feature requirement.
   
   
   ### Description
   
   ## risk
   
   Based on the current log processing logic, if the remote executed script 
outputs a large amount of logs at a certain point in time, the server will 
generate a large strings. 
   Moreover, if there are many such tasks being executed simultaneously and 
handling so many logs at the same time, the generation of a large number of 
strings will cause frequent GC, and even an OOM situation.
   
   ## source code
   
   ```java
   public class RemoteExecutor implements AutoCloseable {
   
       // ...
   
       public void track(String taskId) throws Exception {
           int logN = 0;
           String pid;
           log.info("Remote shell task log:");
           TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
           do {
               pid = getTaskPid(taskId);
               String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN 
+ 1, getRemoteShellHome(), taskId);
               // 👇  tail -n +XXX XXX.log
               String logLine = runRemote(trackCommand);
               if (StringUtils.isEmpty(logLine)) {
                   Thread.sleep(TRACK_INTERVAL);
               } else {
                   // 👇 Here, many temporary string objects were created in 
order to obtain the number of lines.
                   logN += logLine.split("\n").length;
                   log.info(logLine);
                   taskOutputParameterParser.appendParseLog(logLine);
               }
           } while (StringUtils.isNotEmpty(pid));
           
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
       }
   
       public String runRemote(String command) throws IOException {
           try (
                   ChannelExec channel = 
getSession().createExecChannel(command);
                   // 👇 a large amount of logs this time --> large byte array
                   ByteArrayOutputStream out = new ByteArrayOutputStream();
                   ByteArrayOutputStream err = new ByteArrayOutputStream()) {
   
               channel.setOut(System.out);
               channel.setOut(out);
               channel.setErr(err);
               channel.open();
               channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
               channel.close();
               Integer exitStatus = channel.getExitStatus();
               if (exitStatus == null || exitStatus != 0) {
                   throw new TaskException(
                           "Remote shell task error, exitStatus: " + exitStatus 
+ " error message: " + err);
               }
               // 👇 the number of large objects will multiply increase
               return out.toString();
           }
       }
   }
   ```
   
   ## a simple example of improvement
   
   remote --> client -> PipedOutputStream -> PipedInputStream --> consumer read 
line
   
   ```java
   public void runRemote(String command, Consumer<InputStream> handlerOut) 
throws IOException {
       try (
               ChannelExec channel = getSession().createExecChannel(command);
               PipedInputStream in = new PipedInputStream();       // 👈
               PipedOutputStream out = new PipedOutputStream(in);  // 👈
               ByteArrayOutputStream err = new ByteArrayOutputStream()) {
   
           channel.setOut(System.out);
           channel.setOut(out);
           channel.setErr(err);
           channel.open();
           handlerOut.accept(in);  // 👈
           channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
           channel.close();
           Integer exitStatus = channel.getExitStatus();
           if (exitStatus == null || exitStatus != 0) {
               throw new TaskException(
                       "Remote shell task error, exitStatus: " + exitStatus + " 
error message: " + err);
           }
       }
   }
   
   public void track(String taskId) throws Exception {
       AtomicInteger logN = new AtomicInteger();
       String pid;
       log.info("Remote shell task log:");
       TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
       do {
           pid = getTaskPid(taskId);
           int lastLogN = logN.get();  // 👈
           String trackCommand = String.format(COMMAND.TRACK_COMMAND, 
logN.get() + 1, getRemoteShellHome(), taskId);
           // 👇
           runRemote(trackCommand, in -> {
               try (InputStreamReader inReader = new InputStreamReader(in);
                       BufferedReader reader = new BufferedReader(inReader)
               ) {
                   String logLine;
                   while ((logLine = reader.readLine()) != null) {
                       logN.incrementAndGet();
                       log.info(logLine);
                       taskOutputParameterParser.appendParseLog(logLine);
                   }
               } catch (IOException e) {
                   // do sth ...
               }
           });
           if (lastLogN == logN.get()) {
               Thread.sleep(TRACK_INTERVAL);
           }
       } while (StringUtils.isNotEmpty(pid));
       taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
   }
   ```
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 
[email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to