Here’s more detail on the UnsupportedOperation exception. The job starts,
operator collects some stats and then the job dies, apparently on rescaling op:
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event | Info |
JOBSTATUSCHANGED | Job status changed from CREATED to RUNNING
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Status | Info |
STABLE | The resource deployment is considered to be stable and won't
be rolled back
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully
reconciled, nothing to do...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Stabilizing until
2024-04-26 19:22:43
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Creating config map
autoscaler-f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value
1024.000mb (1073741824 bytes), max value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Stabilizing until
2024-04-26 19:22:43
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully
reconciled, nothing to do...
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job
f5179d479dc1921693ffeb3797345458 failed with error:
java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException:
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given
pointwise partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper$6.getOldSubtasks(SubtaskStateMapper.java:180)
at
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.lambda$getNewToOldSubtasksMapping$0(SubtaskStateMapper.java:202)
at java.base/java.util.stream.IntPipeline$1$1.accept(Unknown Source)
at
java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Unknown
Source)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown
Source)
at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
at
org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:139)
at
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198)
at
org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:410)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:440)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:206)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:146)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1822)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1742)
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:210)
at
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:1239)
at
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$27(AdaptiveScheduler.java:1229)
at
org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
... 5 more
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event | Info |
JOBSTATUSCHANGED | Job status changed from RUNNING to FAILED
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value
1024.000mb (1073741824 bytes), max value will be used instead
INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Status | Error |
FAILED |
{"type":"org.apache.flink.util.SerializedThrowable","message":"java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
java.lang.UnsupportedOperationException: ...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully
reconciled, nothing to do...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Cleaning up
FlinkDeployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event | Info |
CLEANUP | Cleaning up FlinkDeployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction
jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value
1024.000mb (1073741824 bytes), max value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Cleaning up autoscaling
meta data
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with
Foreground propagation
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager
Deployment to zero with 300 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Scaling
JobManager Deployment to zero
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting JobManager
Deployment with 298 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Deleting
JobManager Deployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting Kubernetes HA
metadata
Any ideas?
Thanks,
Maxim
From: Gyula Fóra <[email protected]>
Date: Friday, April 26, 2024 at 1:10 AM
To: Maxim Senin <[email protected]>
Cc: Maxim Senin via user <[email protected]>
Subject: Re: [External] Exception during autoscaling operation - Flink
1.18/Operator 1.8.0
Hi Maxim!
Regarding the status update error, it could be related to a problem that we
have discovered recently with the Flink Operator HA. Where during a namespace
change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to the
one containing the fix.
For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d
To resolve the issue (if it's caused by this), you could either cherry-pick the
fix internally to the operator or reduce the replicas to 1 if you are using HA.
Cheers,
Gyula
On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user
<[email protected]<mailto:[email protected]>> wrote:
I have also seen this exception:
o.a.f.k.o.o.JobStatusObserver
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job
d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
I can’t find any information on how to interpret this. Please advise..
Cheers,
Maxim
From: Maxim Senin via user <[email protected]<mailto:[email protected]>>
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user <[email protected]<mailto:[email protected]>>
Subject: [External] Exception during autoscaling operation - Flink
1.18/Operator 1.8.0
Hi.
I already asked before but never got an answer. My observation is that the
operator, after collecting some stats, is trying to restart one of the
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`,
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by
“scaling it to zero” (by setting replicas = 0 in the new generated config).
However, the deployment never comes back up, apparently, due to exception:
2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status
have been modified externally in version 50607043 Previous:
{"jobStatus":{"jobName":"autoscaling
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event
processing ExecutionScope{ resource id:
ResourceID{name='f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'},
version: 50606957} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status
have been modified externally in version 50607043 Previous:
{"jobStatus":{"jobName":"autoscaling
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED",
Caused by:
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status
have been modified externally in version 50607043 Previous:
{"jobStatus":{"jobName":"autoscaling
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHED
at
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:63)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:279)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:156)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:171)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:145)
... 13 more
How to fix this? Why is the deployment not coming back up after this exception?
Is there an configuration property to set a number of retires?
Thanks,
Maxim
________________________________
COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email
is confidential and is intended solely for the addressee. Access to this email
by anyone else is unauthorized. If you are not the intended recipient, any
disclosure, copying, distribution or any action taken or omitted to be taken in
reliance on it, is prohibited and may be unlawful.