JNSimba opened a new pull request, #331:
URL: https://github.com/apache/doris-spark-connector/pull/331

   # Proposed changes
   
   If doris.sink.max-retries is configured, there may be a situation where the 
spark executor gets stuck when fe crashes.
   As shown in the following jstack, the main thread is continuously writing to 
the pipe, and the streamload thread that consumes the pipe has exited. When the 
pipe is full, it will get stuck.
   
   ```java
   "stream-load-worker-0" #55 daemon prio=5 os_prio=31 tid=0x0000000143b9c800 
nid=0xfd03 waiting on condition [0x0000000177b56000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x0000000776ea8260> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
           at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
           at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
           at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
           at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   
   "Executor task launch worker for task 0.0 in stage 0.0 (TID 0)" #54 daemon 
prio=5 os_prio=31 tid=0x0000000125c6b000 nid=0x4307 in Object.wait() 
[0x0000000177949000]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
           at java.lang.Object.wait(Native Method)
           at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:273)
           at java.io.PipedInputStream.receive(PipedInputStream.java:231)
           - locked <0x0000000776e999e8> (a java.io.PipedInputStream)
           at java.io.PipedOutputStream.write(PipedOutputStream.java:149)
           at java.io.OutputStream.write(OutputStream.java:75)
           at 
org.apache.doris.spark.client.write.AbstractStreamLoadProcessor.writeTo(AbstractStreamLoadProcessor.java:475)
           at 
org.apache.doris.spark.client.write.AbstractStreamLoadProcessor.load(AbstractStreamLoadProcessor.java:150)
           at 
org.apache.doris.spark.write.DorisDataWriter.$anonfun$loadBatchWithRetries$1(DorisDataWriter.scala:110)
           at 
org.apache.doris.spark.write.DorisDataWriter$$Lambda$2334/1771533597.apply$mcV$sp(Unknown
 Source)
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
           at scala.util.Try$.apply(Try.scala:213)
           at org.apache.doris.spark.util.Retry$.exec(Retry.scala:34)
           at 
org.apache.doris.spark.write.DorisDataWriter.loadBatchWithRetries(DorisDataWriter.scala:111)
           at 
org.apache.doris.spark.write.DorisDataWriter.write(DorisDataWriter.scala:54)
           at 
org.apache.doris.spark.write.DorisDataWriter.write(DorisDataWriter.scala:33)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$$Lambda$2330/1720488.apply(Unknown
 Source)
           at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$2327/1706417075.apply(Unknown
 Source)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   ```
   
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at 
[[email protected]](mailto:[email protected]) by explaining why you 
chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to