Good morning Daniel,
Another reason could be backpressure with aligned checkpoints:
* Flink processes checkpoints by sending checkpoint markers through the job
graph, beginning with source operators towards the sink operators
* These checkpoint markers are sort of a meta event that is sent along you
custom events (much like watermarks and latency markers)
* These checkpoint markers cannot pass by (i.e. go faster than) your custom
events
* In your situation, because it happen right after you start the job,
* it might be a source that forwards many events (e.g. for backfilling)
while a later operator cannot process these events in the same speed
* therefore the events queue in front of that operator as well as the
checkpoint markers which consequently have a hard time to align event for
longer than the checkpoint timeout
* how to fix this situation:
* diagnostics: Flink dashboard has a tab for checkpoints that show how
long checkpoint progress and alignment take for each task/subtask
* which version of Flink are you using?
* Depending on the version of Flink you can enable unaligned checkpoints
(having some other implications)
* You could also increase scale out factor for the backfill phase and
then lower it again …
* FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold:
this depends on what recovery strategy you have configured …
I might be mistaken, however this is what I look into when I run into similar
situations
Feel free to get back to the mailing list for further clarifications …
Thias
From: Caizhi Weng <[email protected]>
Sent: Donnerstag, 2. September 2021 04:24
To: Daniel Vol <[email protected]>
Cc: user <[email protected]>
Subject: Re: Flink restarts on Checkpoint failure
Hi!
There are a ton of possible reasons for a checkpoint failure. The most possible
reasons might be
* The JVM is busy with garbage collecting when performing the checkpoints. This
can be checked by looking into the GC logs of a task manager.
* The state suddenly becomes quite large due to some specific data pattern.
This can be checked by looking at the state size for the completed portion of
that checkpoint.
You might also want to profile the CPU usage when the checkpoint is happening.
Daniel Vol <[email protected]<mailto:[email protected]>> 于2021年9月1日周三 下午7:08写道:
Hello,
I see the following error in my jobmanager log (Flink on EMR):
Checking cluster logs I see :
2021-08-21 17:17:30,489 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1 (type=CHECKPOINT) @ 1629566250303 for job
c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:17:33,572 [jobmanager-future-thread-5] INFO
com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream
- close closed:false s3://***/_metadata
2021-08-21 17:17:33,800 [jobmanager-future-thread-5] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 1 for job c513e9ebbea4ab72d80b1338896ca5c2 (737859873 bytes in 3496
ms).
2021-08-21 17:27:30,474 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 2 (type=CHECKPOINT) @ 1629566850302 for job
c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:27:46,012 [jobmanager-future-thread-3] INFO
com.amazon.ws<http://com.amazon.ws/>.emr.hadoop.fs.s3n.MultipartUploadOutputStream
- close closed:false s3://***/_metadata
2021-08-21 17:27:46,158 [jobmanager-future-thread-3] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 2 for job c513e9ebbea4ab72d80b1338896ca5c2 (1210889410 bytes in
15856 ms).
2021-08-21 17:37:30,468 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 3 (type=CHECKPOINT) @ 1629567450302 for job
c513e9ebbea4ab72d80b1338896ca5c2.
2021-08-21 17:47:30,469 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 3 of
job c513e9ebbea4ab72d80b1338896ca5c2 expired before completing.
2021-08-21 17:47:30,476 [flink-akka.actor.default-dispatcher-34] INFO
org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global
failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-08-21 17:47:30,478 [flink-akka.actor.default-dispatcher-34] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
session-aggregation (c513e9ebbea4ab72d80b1338896ca5c2) switched from state
RUNNING to RESTARTING.
Configuration is:
-yD "execution.checkpointing.timeout=10 min"\
-yD "restart-strategy=failure-rate"\
-yD "restart-strategy.failure-rate.max-failures-per-interval=70"\
-yD "restart-strategy.failure-rate.delay=1 min"\
-yD "restart-strategy.failure-rate.failure-rate-interval=60 min"\
Not sure this - https://issues.apache.org/jira/browse/FLINK-21215 is related -
but it looks like it is solved.
I know I can increase checkpoint timeout - but checkpoint size is relatively
small and most of the time it takes several seconds to complete so 10 minutes
should be more than enough. So the main question is why "Exceeded checkpoint
tolerable failure threshold" triggered?
Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.