gnehil opened a new pull request, #305:
URL: https://github.com/apache/doris-spark-connector/pull/305

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   If the stream load http request ends before a batch of data is sent for some 
reason, the writer thread will be stuck. If you get the thread stack through 
jstack, you will see the following situation:
   
   ```java
   "stream-load-worker-0" #58 daemon prio=5 os_prio=31 tid=0x00000001150c9000 
nid=0xaf03 waiting on condition [0x000000030479e000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x00000007bba4c428> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
           at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
           at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
           at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
           at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   
   "Executor task launch worker for task 0.0 in stage 0.0 (TID 0)" #57 daemon 
prio=5 os_prio=31 tid=0x00000001606a5000 nid=0x5307 in Object.wait() 
[0x0000000304591000]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
           at java.lang.Object.wait(Native Method)
           at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:273)
           at java.io.PipedInputStream.receive(PipedInputStream.java:231)
           - locked <0x00000007bb972cc8> (a java.io.PipedInputStream)
           at java.io.PipedOutputStream.write(PipedOutputStream.java:149)
           at java.io.OutputStream.write(OutputStream.java:75)
           at 
org.apache.doris.spark.client.write.AbstractStreamLoadProcessor.load(AbstractStreamLoadProcessor.java:165)
           at 
org.apache.doris.spark.write.DorisDataWriter.$anonfun$loadBatchWithRetries$1(DorisDataWriter.scala:110)
           at 
org.apache.doris.spark.write.DorisDataWriter$$Lambda$2433/1537409281.apply$mcV$sp(Unknown
 Source)
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
           at scala.util.Try$.apply(Try.scala:213)
           at org.apache.doris.spark.util.Retry$.exec(Retry.scala:34)
           at 
org.apache.doris.spark.write.DorisDataWriter.loadBatchWithRetries(DorisDataWriter.scala:111)
           at 
org.apache.doris.spark.write.DorisDataWriter.write(DorisDataWriter.scala:54)
           at 
org.apache.doris.spark.write.DorisDataWriter.write(DorisDataWriter.scala:33)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$$Lambda$2429/584782873.apply(Unknown
 Source)
           at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec$$Lambda$2427/1767225268.apply(Unknown
 Source)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$2389/1439152090.apply(Unknown
 Source)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   ```
   
   Since the http request has ended, the data written to the output stream 
buffer will not be read, so the main thread will be stuck here.
   Therefore, a flag is added to indicate whether the batch import is stopped 
normally. If not and the http request has ended, the main thread will be 
interrupted and an exception will be thrown.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at 
[d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you 
chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to