JNSimba opened a new pull request, #539:
URL: https://github.com/apache/doris-flink-connector/pull/539

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   
   When the http thread fails, the main thread will not be able to perceive the 
exception, which may cause write blocking. In this case, you can reduce the 
`sink.check-interval` time.
   This pr is used to optimize the situation when the check-interval is closed 
(or the time is relatively large)
   
   taskmanager log:
   ```
   2025-01-07 14:05:33,608 INFO  
org.apache.doris.flink.sink.writer.RecordBuffer              [] - start buffer 
data, read queue size 0, write queue size 3
   2025-01-07 14:05:33,608 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table 
ods_gof_event_ri stream load started for 
doris_event_test1736229876676_test_xxxx_0_2 on host 1.1.1.1:8030
   2025-01-07 14:05:33,608 INFO  
org.apache.doris.flink.sink.writer.DorisStreamLoad           [] - table 
ods_gof_event_ri start execute load for label 
doris_event_test1736229876676_test_xxxxx_event_ri_0_2
   2025-01-07 14:06:06,705 INFO  org.apache.http.impl.execchain.RetryExec       
              [] - I/O exception (java.net.SocketException) caught when 
processing request to {}->http://1.1.1.1:8041: 连接超时 (Write failed)
   2025-01-07 14:14:23,968 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - Attempting to cancel task doris_event_test: Writer -> 
doris_event_test: Committer (9/15)#0 
(a13ad214aaa851adaea01f944e9b6b61_b6aa5aa3095bbcf2803fa130138b546a_8_0).
   2025-01-07 14:14:23,969 INFO  org.apache.flink.runtime.taskmanager.Task      
              [] - doris_event_test: Writer -> doris_event_test: Committer 
(9/15)#0 
(a13ad214aaa851adaea01f944e9b6b61_b6aa5aa3095bbcf2803fa130138b546a_8_0) 
switched from RUNNING to CANCELING.
   ```
   
   jstack
   ```
   "Map -> Sink: Writer -> Sink: Committer (10/16)#90" #18020 prio=5 os_prio=0 
tid=0x00007f1054033800 nid=0x1058dc waiting on condition [0x00007f1200bc3000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f1c1ba70080> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at 
org.apache.doris.flink.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
        at 
org.apache.doris.flink.sink.writer.RecordStream.write(RecordStream.java:63)
        at 
org.apache.doris.flink.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:258)
        at 
org.apache.doris.flink.sink.writer.DorisWriter.writeOneDorisRecord(DorisWriter.java:216)
        at 
org.apache.doris.flink.sink.writer.DorisWriter.write(DorisWriter.java:186)
        at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
        at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$2007/798273157.accept(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
        at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
        at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils$$Lambda$2190/34063893.accept(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$699/800650307.runDefaultAction(Unknown
 Source)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
        at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1462/326797702.run(Unknown 
Source)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   
   
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at 
[d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you 
chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to