[ 
https://issues.apache.org/jira/browse/KAFKA-13979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fathima Khazana Abdul Haiyum updated KAFKA-13979:
-------------------------------------------------
    Description: 
We have 3 nodes in our MSK cluster which run Apache Kafka 2.6.2. We have 15 
partitions for a topic and 5 consumers in our consumer group, where each 
consumer runs on it's own java application server. Whenever we deploy(rolling) 
to our servers, we notice a huge consumer lag on *some* of the 15 partitions. 
It appears that the consumer after rebalancing resets its committed offset and 
reprocesses messages. For example: this is what I'm seeing:
{code:java}
logger_name:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
Committed offset 3044 for partition mytopic-0{code}
 

So we know for a fact that the offset 3044 has been committed for partition 0.

 

Running {{./kafka-consumer-groups.sh --describe}} gives the following:
{code:java}
GROUP PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID CLIENT-ID 
myService-mytopic 0 3044 3044 0 myService-mytopic-0
 {code}
{{  }}

After a deploy, which removes the consumer from the group and triggers a 
rebalance + adds the consumer back, I see this:
{code:java}
GROUP PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID CLIENT-ID 
myService-mytopic 0 1890 3047 1157 myService-mytopic-0{code}
 

In the application logs, I see this:
{code:java}
logger_name:org.apache.kafka.clients.consumer.internals.Fetcher 
message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
Fetch position FetchPosition{offset=1890, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[b-3.kafka-mytestserver.1gkwlu.c16.kafka.us-east-1.amazonaws.com:9098
 (id: 3 rack: use1-az1)], epoch=0 is out of range for partition mytopic-0, 
resetting offset}}{code}
Why is kafka fetching the current-offset 1890 which is before the committed 
offset for the partition after rebalance? This is on a test environment where 
less than 1 message is produced per second. This issue occurs for both auto 
commit (default interval) and manual commit mechanisms and on kafka versions 
2.6.2 and 2.8.1. On production, we have much more traffic and causes 
reprocessing of around 2 million messages per partition. 

{{auto.offset.reset=latest}} and {{retention.ms=1000}} if that matters. We're 
using the java client {{kafka-clients}} version 3.0.0.

The five consumers have the same {{{}client.id{}}}.

 

  was:
We have 3 nodes in our MSK cluster which run Apache Kafka 2.6.2. We have 15 
partitions for a topic and 5 consumers in our consumer group, where each 
consumer runs on it's own java application server. Whenever we deploy(rolling) 
to our servers, we notice a huge consumer lag on *some* of the 15 partitions. 
It appears that the consumer after rebalancing resets its committed offset and 
reprocesses messages. For example: this is what I'm seeing:
{code:java}
logger_name:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
Committed offset 3044 for partition mytopic-0{code}
 

So we know for a fact that the offset 3044 has been committed for partition 0.

 

Running {{./kafka-consumer-groups.sh --describe}} gives the following:
{code:java}
GROUP PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID CLIENT-ID 
myService-mytopic 0 3044 3044 0 myService-mytopic-0
 {code}
{{  }}

After a deploy, which removes the consumer from the group and triggers a 
rebalance + adds the consumer back, I see this:
{code:java}
GROUP PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID CLIENT-ID 
myService-mytopic 0 1890 3047 1157 myService-mytopic-0{code}
 

In the application logs, I see this:
{code:java}
logger_name:org.apache.kafka.clients.consumer.internals.Fetcher 
message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
Fetch position FetchPosition{offset=1890, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[b-3.kafka-mytestserver.1gkwlu.c16.kafka.us-east-1.amazonaws.com:9098
 (id: 3 rack: use1-az1)], epoch=0 is out of range for partition mytopic-0, 
resetting offset}}{code}
Why is kafka fetching the current-offset 1890 which is before the committed 
offset for the partition after rebalance? This is on a test environment where 
less than 1 message is produced per second. This issue occurs for both auto 
commit (default interval) and manual commit mechanisms and on kafka versions 
2.6.2 and 2.8.1. On production, we have much more traffic and causes 
reprocessing of around 2 million messages per partition. 

{{auto.offset.reset=latest}} and {{retention.ms=1000}} if that matters. We're 
using the java client {{kafka-clients}} version 3.0.0.

 

We have 3 nodes in our MSK cluster which run apache kafka 2.6.2. We have 15 
partitions and 5 consumers in our consumer group, where each consumer runs on 
it's own java application server. Whenever we deploy(rolling) to our servers, 
we notice a huge consumer lag on some of the 15 partitions. It appears that the 
consumer after rebalancing resets its committed offset and reprocesses 
messages. For example: this is what I'm seeing:

 

{{logger_name:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
Committed offset 3044 for partition mytopic-0}}

So we know for a fact that the offset 3044 has been committed.

Running {{./kafka-consumer-groups.sh --describe }} gives the following:

 

{{GROUP               PARTITION   CURRENT-OFFSET  LOG-END-OFFSET  LAG      
CONSUMER-ID                                                                     
      CLIENT-ID
myService-mytopic   0          3044            3044          0    
myService-mytopic-0 }}

After a deploy, which in turn removes the consumer from the group and triggers 
a rebalance, I see this:

 

{{GROUP               PARTITION   CURRENT-OFFSET  LOG-END-OFFSET  LAG      
CONSUMER-ID                                                                     
      CLIENT-ID
myService-mytopic   0           1890            3047         1157    
myService-mytopic-0 }}

In the logs, I see this:

 

{{logger_name:org.apache.kafka.clients.consumer.internals.Fetcher
message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
Fetch position FetchPosition\{offset=1890, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[b-3.kafka-mytestserver.1gkwlu.c16.kafka.us-east-1.amazonaws.com:9098
 (id: 3 rack: use1-az1)], epoch=0}} is out of range for partition mytopic-0, 
resetting offset}}

Why is kafka fetching the current-offset 1890 which is beyond the committed 
offset for the partition after rebalance? This is on a test environment where 
less than 1 message is produced per second. This issue occurs for both auto 
commit (default interval) and manual commit mechanisms and on kafka versions 
2.6.2 and 2.8.1. On production, we have much more traffic and causes 
reprocessing of around 2 million messages per partition.

{{auto.offset.reset=latest}} and {{retention.ms=1000}} if that matters. We're 
using the java client {{kafka-clients}} version 3.0.0.

The five consumers have the same {{{}client.id{}}}.

 


> Kafka resets committed offset after rebalance
> ---------------------------------------------
>
>                 Key: KAFKA-13979
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13979
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.6.2
>            Reporter: Fathima Khazana Abdul Haiyum
>            Priority: Critical
>
> We have 3 nodes in our MSK cluster which run Apache Kafka 2.6.2. We have 15 
> partitions for a topic and 5 consumers in our consumer group, where each 
> consumer runs on it's own java application server. Whenever we 
> deploy(rolling) to our servers, we notice a huge consumer lag on *some* of 
> the 15 partitions. It appears that the consumer after rebalancing resets its 
> committed offset and reprocesses messages. For example: this is what I'm 
> seeing:
> {code:java}
> logger_name:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 
> message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
> Committed offset 3044 for partition mytopic-0{code}
>  
> So we know for a fact that the offset 3044 has been committed for partition 0.
>  
> Running {{./kafka-consumer-groups.sh --describe}} gives the following:
> {code:java}
> GROUP PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID CLIENT-ID 
> myService-mytopic 0 3044 3044 0 myService-mytopic-0
>  {code}
> {{  }}
> After a deploy, which removes the consumer from the group and triggers a 
> rebalance + adds the consumer back, I see this:
> {code:java}
> GROUP PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID CLIENT-ID 
> myService-mytopic 0 1890 3047 1157 myService-mytopic-0{code}
>  
> In the application logs, I see this:
> {code:java}
> logger_name:org.apache.kafka.clients.consumer.internals.Fetcher 
> message:[Consumer clientId=myService-mytopic-0, groupId=myService-mytopic] 
> Fetch position FetchPosition{offset=1890, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-3.kafka-mytestserver.1gkwlu.c16.kafka.us-east-1.amazonaws.com:9098
>  (id: 3 rack: use1-az1)], epoch=0 is out of range for partition mytopic-0, 
> resetting offset}}{code}
> Why is kafka fetching the current-offset 1890 which is before the committed 
> offset for the partition after rebalance? This is on a test environment where 
> less than 1 message is produced per second. This issue occurs for both auto 
> commit (default interval) and manual commit mechanisms and on kafka versions 
> 2.6.2 and 2.8.1. On production, we have much more traffic and causes 
> reprocessing of around 2 million messages per partition. 
> {{auto.offset.reset=latest}} and {{retention.ms=1000}} if that matters. We're 
> using the java client {{kafka-clients}} version 3.0.0.
> The five consumers have the same {{{}client.id{}}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to