[ 
https://issues.apache.org/jira/browse/FLINK-35321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058085#comment-18058085
 ] 

Piotr Rudnicki edited comment on FLINK-35321 at 2/12/26 12:46 PM:
------------------------------------------------------------------

same here, 1.20.2

I wonder if we could register metric at {{CommiterOperator#snapshotState}} (see 
[~mosinnik] comment above). For doing so one may use 
g{{{}etCheckpointsCommitablesUpTo{}}} method:
{code:java}
committableCollector.getCheckpointCommittablesUpTo(context.getCheckpointId());{code}
Then, having these checkpoint commitables there is possibility to imlement 
{{CommitableCollector#getNumPending}} at the {{CommiterOperator}} level.

Here is my attempt: [https://github.com/apache/flink/pull/27597]


was (Author: JIRAUSER312406):
same here, 1.20.2

I wonder if we could register metric at {{CommiterOperator#snapshotState}} (see 
[~mosinnik] comment above). For doing so one may use 
g{{{}etCheckpointsCommitablesUpTo{}}} method:
{code:java}
committableCollector.getCheckpointCommittablesUpTo(context.getCheckpointId());{code}
Then, having these checkpoint commitables there is possibility to imlement 
{{CommitableCollector#getNumPending}} at the {{CommiterOperator}} level.

 

Here is my attempt: https://github.com/apache/flink/pull/27597

> CheckpointCommittableManagerImpl re-registers pendingCommittables gauge on 
> every commit operation
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35321
>                 URL: https://issues.apache.org/jira/browse/FLINK-35321
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, Runtime / Checkpointing, Runtime / Metrics
>    Affects Versions: 1.19.0
>            Reporter: Lennon Yu
>            Assignee: Marton Balassi
>            Priority: Major
>              Labels: pull-request-available
>
> Found while testing a home-made Sink implementation that implements 
> SupportCommitter. We observed that starting from the *second* checkpoint, 
> every committer commit will be accompanied by the warning log:
> {quote}
> Name collision: Group already contains a Metric with the name 
> 'pendingCommittables'.
> {quote}
> Enabling the debugger and tracing the origin of this log took us to 
> {{org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl}}
>  at line 137 of the commit() method:
> {code:java}
> metricGroup.setCurrentPendingCommittablesGauge(() -> 
> getPendingRequests(false).size());
> {code}
> It looks like that instead of modifying the value of the gauge, the manager 
> class is *re-setting with a different guage* on every commit operation, which 
> explains the appearance of the warning log shown above.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to