Hi Hongshun,

I am not too concerned about the transmission cost. Because the full split
transmission has to happen in the initial assignment phase already. And in
the future, we probably want to also introduce some kind of workload
balance across source readers, e.g. based on the per-split throughput or
the per-source-reader workload in heterogeneous clusters. For these
reasons, we do not expect the Split objects to be huge, and we are not
trying to design for huge Split objects either as they will have problems
even today.

Good point on the potential split loss, please see the reply below:

Scenario 2:


> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon
> restart.
> 2. Before the enumerator receives all reports and performs reassignment, a
> checkpoint is triggered.
> 3. Since no splits have been reassigned yet, both readers have empty
> states.
> 4. When restarting from this checkpoint, all four splits are lost.

The reader registration happens in the SourceOperator.open(), which means
the task is still in the initializing state, therefore the checkpoint
should not be triggered until the enumerator receives all the split reports.

There is a nuance here. Today, the RPC call from the TM to the JM is async.
So it is possible that the SourceOpertor.open() has returned, but the
enumerator has not received the split reports. However, because the task
status update RPC call goes to the same channel as the split reports call,
so the task status RPC call will happen after the split reports call on the
JM side. Therefore, on the JM side, the SourceCoordinator will always first
receive the split reports, then receive the checkpoint request.
This "happen before" relationship is kind of important to guarantee the
consistent state between enumerator and readers.

Scenario 1:


> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader B
> reports (3 and 4).
> 2. The enumerator receives these reports but only reassigns splits 1 and 2
> — not 3 and 4.
> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are
> recorded in the reader states; splits 3 and 4 are not persisted.
> 4. If the job is later restarted from this checkpoint, splits 3 and 4 will
> be permanently lost.

This scenario is possible. One solution is to let the enumerator
implementation handle this. That means if the enumerator relies on the
initial split reports from the source readers, it should maintain these
reports by itself. In the above example, the enumerator will need to
remember that 3 and 4 are not assigned and put it into its own state.
The current contract is that anything assigned to the SourceReaders
are completely owned by the SourceReaders. Enumerators can remember the
assignments but cannot change them, even when the source reader recovers /
restarts.
With this FLIP, the contract becomes that the source readers will return
the ownership of the splits to the enumerator. So the enumerator is
responsible for maintaining these splits, until they are assigned to a
source reader again.

There are other cases where there may be conflict information between
reader and enumerator. For example, consider the following sequence:
1. reader A reports splits (1 and 2) up on restart.
2. enumerator receives the report and assigns both 1 and 2 to reader B.
3. reader A failed before checkpointing. And this is a partial failure, so
only reader A restarts.
4. When reader A recovers, it will again report splits (1 and 2) to the
enumerator.
5. The enumerator should ignore this report because it has assigned splits
(1 and 2) to reader B.

So with the new contract, the enumerator should be the source of truth for
split ownership.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <[email protected]>
wrote:

> Hi Becket,
>
>
> I did consider this approach at the beginning (and it was also mentioned
> in this FLIP), since it would allow more flexibility in reassigning all
> splits. However, there are a few potential issues.
>
> 1. High Transmission Cost
> If we pass the full split objects (rather than just split IDs), the data
> size could be significant, leading to high overhead during transmission —
> especially when many splits are involved.
>
> 2. Risk of Split Loss
>
> Risk of split loss exists unless we have a mechanism to make sure only can
> checkpoint after all the splits are reassigned.
> There are scenarios where splits could be lost due to inconsistent state
> handling during recovery:
>
>
> Scenario 1:
>
>
>    1. Upon restart, Reader A reports assigned splits (1 and 2), and
>    Reader B reports (3 and 4).
>    2. The enumerator receives these reports but only reassigns splits 1
>    and 2 — not 3 and 4.
>    3. A checkpoint or savepoint is then triggered. Only splits 1 and 2
>    are recorded in the reader states; splits 3 and 4 are not persisted.
>    4. If the job is later restarted from this checkpoint, splits 3 and 4
>    will be permanently lost.
>
>
> Scenario 2:
>
>    1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4)
>    upon restart.
>    2. Before the enumerator receives all reports and performs
>    reassignment, a checkpoint is triggered.
>    3. Since no splits have been reassigned yet, both readers have empty
>    states.
>    4. When restarting from this checkpoint, all four splits are lost.
>
>
> Let me know if you have thoughts on how we might mitigate these risks!
>
> Best
> Hongshun
>
> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]> wrote:
>
>> Hi Hongshun,
>>
>> The steps sound reasonable to me in general. In terms of the updated FLIP
>> wiki, it would be good to see if we can keep the protocol simple. One
>> alternative way to achieve this behavior is following:
>>
>> 1. Upon SourceOperator startup, the SourceOperator sends
>> ReaderRegistrationEvent with the currently assigned splits to the
>> enumerator. It does not add these splits to the SourceReader.
>> 2. The enumerator will always use the
>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the
>> response of the SourceRegistrationEvent, this allows async split assignment
>> in case the enumerator wants to wait until all the readers are registered)
>> 3. The SourceOperator will only call SourceReader.addSplits() when it
>> receives the AddSplitEvent from the enumerator.
>>
>> This protocol has a few benefits:
>> 1. it basically allows arbitrary split reassignment upon restart
>> 2. simplicity: there is only one way to assign splits.
>>
>> So we only need one interface change:
>> - add the initially assigned splits to ReaderInfo so the Enumerator can
>> access it.
>> and one behavior change:
>> - The SourceOperator should stop assigning splits to the from state
>> restoration, but only do that when it receives AddSplitsEvent from the
>> enumerator.
>>
>> The enumerator story is also simple:
>> 1. Receive some kind of notification (new partition, new reader, etc)
>> 2. look at the reader information (in the enumerator context or
>> self-maintained state)
>> 3. assign splits via the enumerator context.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <[email protected]>
>> wrote:
>>
>>> Hi Becket,
>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s
>>> design principle. It’s really interesting!
>>>
>>> > One principle we want to follow is that the enumerator should be the
>>> brain doing the splits assignment, while the source readers read from the
>>> assigned splits. So we want to avoid the case where the SourceReader
>>> ignores the split assignment.
>>>
>>> It appears that MySQL CDC currently bypasses this principle by
>>> proactively removing unused splits directly in the SourceReader. This may
>>> be due to the lack of built-in framework support for such cleanup, forcing
>>> connectors to handle it manually. However, this responsibility ideally
>>> belongs in the framework.
>>>
>>> With this FLIP, we propose a redesigned mechanism that centralizes split
>>> cleanup logic in the SplitEnumerator, allowing connectors like MySQL CDC to
>>> eventually adopt it( @leneord, CC).
>>>
>>>
>>> To achieve this, we must carefully manage state consistency during
>>> startup and recovery. The proposed approach is as follows:
>>>
>>>    1. Reader Registration with Deferred Assignment
>>>    When a reader starts (SourceOperator#open), it sends a
>>>    ReaderRegistrationEvent to the SplitEnumerator, including its
>>>    previously assigned splits (restored from state). However, these splits
>>>    are not yet assigned to the reader. The SourceOperator is placed in
>>>    a PENDING state.
>>>    2. Prevent State Pollution During Registration
>>>    While in the PENDING state, SourceOperator#snapshotState will not
>>>    update the operator state. This prevents empty or outdated reader state
>>>    (e.g., with removed splits) from polluting the checkpoint.
>>>    3. Enumerator Performs Split Cleanup and Acknowledges
>>>    Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes
>>>    any splits that are no longer valid (e.g., due to removed topics or 
>>> tables)
>>>    and returns the list of remaining valid split IDs to the reader via a
>>>    ReaderRegistrationACKEvent.
>>>    For backward compatibility, the default behavior is to return all
>>>    split IDs (i.e., no filtering).
>>>    4. Finalize Registration and Resume Normal Operation
>>>    When the SourceOperator receives the ReaderRegistrationACKEvent, it
>>>    assigns the confirmed splits to the reader and transitions its state to
>>>    REGISTERED. From this point onward, SourceOperator#snapshotState can
>>>    safely update the operator state.
>>>
>>>
>>> Best,
>>> Hongshun
>>>
>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <[email protected]> wrote:
>>>
>>>> SourceCoordinator doesn't store splits that have already been assigned
>>>>> to readers, and SplitAssignmentTracker stores the splits only for this
>>>>> checkpoint, which will be removed after checkpoint. Maybe you mean
>>>>> SourceOperator?
>>>>
>>>> Yes, I meant SourceOperator.
>>>>
>>>> At the beginning, I also thought about using it. However, there are two
>>>>> situations:
>>>>> 1. During restart, if source options remove a topic or table:
>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>> restart
>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured
>>>>> topics change, removed topic's splits are still read. I also want to do 
>>>>> the
>>>>> same thing in Kafka.
>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>> removed after restart.
>>>>> In these cases, I have to get the assigned splits after
>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>> directly.
>>>>
>>>>
>>>> One principle we want to follow is that the enumerator should be the
>>>> brain doing the splits assignment, while the source readers read from the
>>>> assigned splits. So we want to avoid the case where the SourceReader
>>>> ignores the split assignment. Given this principle,
>>>> For case 1, if there is a subscription change, it might be better to
>>>> hold back calling SourceReader.addSplits() until an assignment is confirmed
>>>> by the Enumerator. In fact, this might be a good default behavior
>>>> regardless of whether there is a subscription change.
>>>> For case 2: if a bounded split is finished, the
>>>> SourceReader.snapshotState() will not contain that split. So upon
>>>> restoration, those splits should not appear, right?
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Becket,
>>>>>
>>>>> Thank you a lot for your advice, which helped me a lot.
>>>>> >  It seems that we don't need the method `SourceReader.
>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>> SourceCoordinator upon state restoration.
>>>>>
>>>>>  SourceCoordinator doesn't store splits that have already been
>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only for
>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean
>>>>> SourceOperator?
>>>>>
>>>>> At the beginning, I also thought about using it. However, there are
>>>>> two situations:
>>>>> 1. During restart, if source options remove a topic or table:
>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>> restart
>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured
>>>>> topics change, removed topic's splits are still read. I also want to do 
>>>>> the
>>>>> same thing in Kafka.
>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>> removed after restart.
>>>>> In these cases, I have to get the assigned splits after
>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>> directly.
>>>>>
>>>>> >  By design, the SplitEnumerator can get the reader information any
>>>>> time from the `SplitEnumeratorContext.registeredReaders()`.
>>>>> It looks good.
>>>>>
>>>>> Thanks again.
>>>>>
>>>>> Best,
>>>>> Hongshun
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238
>>>>>
>>>>>
>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Hongshun,
>>>>>>
>>>>>> Thanks for the proposal. The current Kafka split assignment algorithm
>>>>>> does seem to have issues. (I cannot recall why it was implemented this 
>>>>>> way
>>>>>> at that time...).
>>>>>>
>>>>>> Two quick comments:
>>>>>> 1. It seems that we don't need the method `SourceReader.
>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>> SourceCoordinator upon state restoration and can be put into the
>>>>>> ReaderRegistrationEvent.
>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int
>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of
>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the 
>>>>>> SplitEnumerator
>>>>>> can get the reader information any time from the
>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the
>>>>>> Enumerator implementation to remember the initially assigned splits, if 
>>>>>> it
>>>>>> wants to wait until all the readers are registered. This also allow 
>>>>>> future
>>>>>> addition of reader information.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I am
>>>>>>> looking forward for your reply.
>>>>>>>
>>>>>>> Best
>>>>>>> Hongshun
>>>>>>>
>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Hongshun for driving this work.
>>>>>>>>
>>>>>>>>
>>>>>>>> We also suffering the issue in production Kafka restoration usage,
>>>>>>>> current design is a nice tradeoff and has considered the new Source
>>>>>>>> implementation details, +1 from my side.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Leonard
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <[email protected]> 写道:
>>>>>>>> >
>>>>>>>> > Hi devs,
>>>>>>>> >
>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator
>>>>>>>> with Global
>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] [1],
>>>>>>>> which
>>>>>>>> > addresses critical limitations in our current Kafka connector
>>>>>>>> split
>>>>>>>> > distribution mechanism.
>>>>>>>> >
>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently
>>>>>>>> lead to
>>>>>>>> > uneven Kafka split distribution, causing reader delays and
>>>>>>>> performance
>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of
>>>>>>>> visibility
>>>>>>>> > into post-assignment split distribution.
>>>>>>>> >
>>>>>>>> > This flip does two things:
>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send
>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after
>>>>>>>> startup to
>>>>>>>> > ensure state consistency.
>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced
>>>>>>>> splits and
>>>>>>>> > state awareness during recovery (the enumerator will always
>>>>>>>> choose the
>>>>>>>> > least assigned subtask,and reason aslo as follows)
>>>>>>>> >
>>>>>>>> > Any additional questions regarding this FLIP? Looking forward to
>>>>>>>> hearing
>>>>>>>> > from you.
>>>>>>>> >
>>>>>>>> > Best
>>>>>>>> > Hongshun
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > [1]
>>>>>>>> >
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>>>>>>>
>>>>>>>>

Reply via email to