[
https://issues.apache.org/jira/browse/FLINK-35321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058085#comment-18058085
]
Piotr Rudnicki edited comment on FLINK-35321 at 2/12/26 12:46 PM:
------------------------------------------------------------------
same here, 1.20.2
I wonder if we could register metric at {{CommiterOperator#snapshotState}} (see
[~mosinnik] comment above). For doing so one may use
g{{{}etCheckpointsCommitablesUpTo{}}} method:
{code:java}
committableCollector.getCheckpointCommittablesUpTo(context.getCheckpointId());{code}
Then, having these checkpoint commitables there is possibility to imlement
{{CommitableCollector#getNumPending}} at the {{CommiterOperator}} level.
Here is my attempt: [https://github.com/apache/flink/pull/27597]
was (Author: JIRAUSER312406):
same here, 1.20.2
I wonder if we could register metric at {{CommiterOperator#snapshotState}} (see
[~mosinnik] comment above). For doing so one may use
g{{{}etCheckpointsCommitablesUpTo{}}} method:
{code:java}
committableCollector.getCheckpointCommittablesUpTo(context.getCheckpointId());{code}
Then, having these checkpoint commitables there is possibility to imlement
{{CommitableCollector#getNumPending}} at the {{CommiterOperator}} level.
Here is my attempt: https://github.com/apache/flink/pull/27597
> CheckpointCommittableManagerImpl re-registers pendingCommittables gauge on
> every commit operation
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-35321
> URL: https://issues.apache.org/jira/browse/FLINK-35321
> Project: Flink
> Issue Type: Bug
> Components: API / Core, Runtime / Checkpointing, Runtime / Metrics
> Affects Versions: 1.19.0
> Reporter: Lennon Yu
> Assignee: Marton Balassi
> Priority: Major
> Labels: pull-request-available
>
> Found while testing a home-made Sink implementation that implements
> SupportCommitter. We observed that starting from the *second* checkpoint,
> every committer commit will be accompanied by the warning log:
> {quote}
> Name collision: Group already contains a Metric with the name
> 'pendingCommittables'.
> {quote}
> Enabling the debugger and tracing the origin of this log took us to
> {{org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl}}
> at line 137 of the commit() method:
> {code:java}
> metricGroup.setCurrentPendingCommittablesGauge(() ->
> getPendingRequests(false).size());
> {code}
> It looks like that instead of modifying the value of the gauge, the manager
> class is *re-setting with a different guage* on every commit operation, which
> explains the appearance of the warning log shown above.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)