[ 
https://issues.apache.org/jira/browse/FLINK-35321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058085#comment-18058085
 ] 

Piotr Rudnicki edited comment on FLINK-35321 at 2/12/26 9:34 AM:
-----------------------------------------------------------------

same here, 1.20.2

I wonder if we could register metric at `CommiterOperator#snapshotState` (see 
[~mosinnik] comment above). For doing so one may use 
`getCheckpointsCommitablesUpTo` method:
{code:java}
committableCollector.getCheckpointCommittablesUpTo(context.getCheckpointId());{code}
Then, having these checkpoint commitables there is possibility to imlement 
`CommitableCollector#getNumPending` at the `CommiterOperator` level.


was (Author: JIRAUSER312406):
same here, 1.20.2

 

I wonder if we could register metric at `CommiterOperator#snapshotState` (see 
[~mosinnik] comment above). For doing so one may use 
`getCheckpointsCommitablesUpTo` method:


{code:java}
committableCollector.getCheckpointCommittablesUpTo(context.getCheckpointId());{code}
Then, having these checkpoint commitables there is possibility to imlement 
`CommitableCollector#getNumPending` at the `CommiterOperator` level.

> CheckpointCommittableManagerImpl re-registers pendingCommittables gauge on 
> every commit operation
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35321
>                 URL: https://issues.apache.org/jira/browse/FLINK-35321
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, Runtime / Checkpointing, Runtime / Metrics
>    Affects Versions: 1.19.0
>            Reporter: Lennon Yu
>            Assignee: Marton Balassi
>            Priority: Major
>
> Found while testing a home-made Sink implementation that implements 
> SupportCommitter. We observed that starting from the *second* checkpoint, 
> every committer commit will be accompanied by the warning log:
> {quote}
> Name collision: Group already contains a Metric with the name 
> 'pendingCommittables'.
> {quote}
> Enabling the debugger and tracing the origin of this log took us to 
> {{org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl}}
>  at line 137 of the commit() method:
> {code:java}
> metricGroup.setCurrentPendingCommittablesGauge(() -> 
> getPendingRequests(false).size());
> {code}
> It looks like that instead of modifying the value of the gauge, the manager 
> class is *re-setting with a different guage* on every commit operation, which 
> explains the appearance of the warning log shown above.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to