tyamashi-oss commented on pull request #10644:
URL: https://github.com/apache/kafka/pull/10644#issuecomment-1006638780
I'm having a similar problem. I was looking for a solution to my problem and
found here. Please forgive me for expressing my personal opinion and sharing my
similar problem.
I believe the consumer offset should be log-end-offset, not 0, when
translated offset larger than partition end offset.
(Actually, in the original problem reproduction, log-end-offset is also 0,
so the current implementation that resets offset to 0 will be able to be simply
replaced with log-end-offset.)
When translated offset larger than partition end offset or lower than
partition earliest offset, once the consumer client start in the target
cluster, the consumer client will be reset based on "auto.offset.reset(default:
latest)" because the broker will return InvalidOffsetException. Then the
consumer in the target cluster will duplicate or skip a large number of
messages.
In order to avoid automatic reset by the consumer, I believe that the
translated offset should stay within the valid log offset(from earliest offset
to end offset). So the consumer offset should be log-end-offset, not 0, when
translated offset larger than partition end offset.
FYI, in addition to the original problem, I also found another way to
reproduce the negative lag caused by the translated offset being larger than
the partition end offset. In the following case, the solution is to use
log-end-offset(, not 0 because it will lead a large number of duplicate
messages) for the translated offset.
~~~
0. The topic can hold only 100,000 messages based on retention.bytes.
1. Add 10,000 messages to the topic on the source cluster and consume it by
a consumer on the source cluster, then wait for MM2 to synchronize.
=>
The source cluster:
log: from 1 to 10,000
Consumer group offset : 10,000
lag: 0
The target cluster:
log: from 1 to 10,000
Consumer group offset : 10,000
lag: 0
2. Stop MM2 and add 1,000,000 messages to the topic on the source cluster
and consume it by a consumer on the source cluster.
=>
The source cluster:
log: from 910,001 to 1,010,000 (cut by retention.bytes)
Consumer group offset : 1,010,000
lag: 0
The target cluster [not changed]:
log: from 1 to 10,000
Consumer group offset : 10,000
lag: 0
3. Restart MM2, then wait for MM2 to synchronize.
Although this is a timing issue, the problem occurs if the
MirrorCheckpointConnector synchronizes the consumer group offset before the
MirrorSourceConnector updates the offsetSyncTopic.
=>
The source cluster [not changed]:
log: from 910,001 to 1,010,000
Consumer group offset : 1,010,000
lag: 0
The target cluster:
log: from 10,001 to 110,000
Consumer group offset : 1,010,000
lag: -900,000
~~~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]