featzhang created FLINK-39388:
---------------------------------
Summary: Fix Flaky DataGeneratorSourceITCase#testGatedRateLimiter
Key: FLINK-39388
URL: https://issues.apache.org/jira/browse/FLINK-39388
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 2.0.2
Reporter: featzhang
{{DataGeneratorSourceITCase#testGatedRateLimiter}} is a flaky test due to a
race condition in the {{FirstCheckpointFilter}} inner class.
----
h2. Description
The integration test {{DataGeneratorSourceITCase#testGatedRateLimiter}} fails
intermittently in CI (observed in build
[#73815|https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=73815]
triggered by PR [#27351|https://github.com/apache/flink/pull/27351]).
h3. Root Cause
The test uses an inner class {{FirstCheckpointFilter}} to collect only the
elements emitted before the first checkpoint. In the original implementation,
{{FirstCheckpointFilter}} stops collecting inside {{snapshotState()}}:
{code:java}
// BEFORE (buggy)
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
firstCheckpoint = false; // stops collecting immediately on snapshot
}
{code}
This creates a race condition:
# {{GatedRateLimiter}} completes its first cycle immediately (by design) and
emits {{capacityPerCheckpoint = 8}} elements without waiting for a checkpoint.
# These 8 elements travel through the pipeline network to reach
{{FirstCheckpointFilter}}.
# If the checkpoint barrier arrives at {{FirstCheckpointFilter}} *before* all 8
elements have been processed, {{snapshotState()}} is called prematurely and
{{firstCheckpoint}} is set to {{false}}.
# Subsequent elements are dropped, so the final result contains fewer than 8
elements.
# The assertion {{assertThat(results).hasSize(capacityPerCheckpoint)}} fails.
The failure is non-deterministic and depends on scheduling, network latency,
and system load in the CI environment — classic symptoms of a flaky test.
h3. Fix
Replace the {{snapshotState}}-based cutoff with a
{{notifyCheckpointComplete}}-based cutoff by implementing
{{CheckpointListener}}. The {{notifyCheckpointComplete}} callback is invoked
only after the checkpoint has been fully acknowledged by all operators, which
guarantees that all elements emitted in the same checkpoint cycle have already
been processed downstream.
{code:java}
// AFTER (fixed)
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Record the ID of the first checkpoint so we can stop collecting when it
completes.
if (firstCheckpointId == Long.MIN_VALUE) {
firstCheckpointId = context.getCheckpointId();
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// Stop collecting elements once the first checkpoint has completed.
if (checkpointId >= firstCheckpointId && firstCheckpointId !=
Long.MIN_VALUE) {
firstCheckpointCompleted = true;
}
}
{code}
----
h2. Steps to Reproduce
Run {{DataGeneratorSourceITCase#testGatedRateLimiter}} repeatedly under load or
in a slow CI environment. The test will occasionally fail with:
{noformat}
org.opentest4j.AssertionFailedError:
expected: a collection with size 8
but was: a collection with size <N> (N < 8)
{noformat}
----
h2. Expected Behavior
The test passes consistently regardless of checkpoint timing.
----
h2. Actual Behavior
The test fails intermittently when the checkpoint barrier reaches
{{FirstCheckpointFilter}} before all upstream elements have been processed.
----
h2. Environment
- *Flink Version*: 2.0 (master)
- *Java Version*: 17
- *OS*: Linux (CI)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)