[
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Phil Mikhailov updated KAFKA-6822:
----------------------------------
Description:
We have a microservices that use Kafka Streams which stuck in initialization of
stream topolgy while filling StateStore from Kafka using Kafka consumer.
Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but
environment runs Kafka cluster 1.0.0 (Confluent 4.0.0).
We reproduced this problem several time by restarting microservices and
eventually had to reset the stream offsets to beginning in order unblock
microservices.
While investigating this problem more deeply we found out that StateStore
(0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses
KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like this:
Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
So the next offset is estimated.
In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.
That said, we had a situation when StateStore (0.10.2.1) stuck in loading data.
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept
spinning in consumer loop 'cause the following condition never happened:
{code:java}
} else if (restoreConsumer.position(storePartition) == endOffset) {
break;
}
{code}
We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of
compaction.
Or there is inconsistency between offsets calculation between 0.10.2.1 and
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.
was:
We faced the problem when StateStore (0.10.2.1) stuck in loading data during
start of microservice.
Our configuration is Kafka 1.0.0 but microservices are built with Kafka
Streams 0.10.2.1.
We had to reset the stream offsets in order unblock microservices 'cause
restarts didn't help.
We faced the problem only once and didn't have a chance to reproduce it, so
we're sorry in advance for maybe poor explanations.
Below are details that we've managed to collect that time:
Kafka consumer 0.10.2.1 calculates offsets like this:
Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
So the next offset is estimated.
In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.
That said, we had a situation when StateStore (0.10.2.1) stuck in loading data.
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept
spinning in consumer loop 'cause this condition never happened:
{code:java}
} else if (restoreConsumer.position(storePartition) == endOffset) {
break;
}
{code}
We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of
compaction.
Or there is inconsistency between offsets calculation between 0.10.2.1 and
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.
> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> -------------------------------------------------------------------
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
> Issue Type: Bug
> Components: consumer, streams
> Reporter: Phil Mikhailov
> Priority: Major
>
> We have a microservices that use Kafka Streams which stuck in initialization
> of stream topolgy while filling StateStore from Kafka using Kafka consumer.
> Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but
> environment runs Kafka cluster 1.0.0 (Confluent 4.0.0).
> We reproduced this problem several time by restarting microservices and
> eventually had to reset the stream offsets to beginning in order unblock
> microservices.
> While investigating this problem more deeply we found out that StateStore
> (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses
> KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like
> this:
> Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
> So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}}
> which kept spinning in consumer loop 'cause the following condition never
> happened:
> {code:java}
> } else if (restoreConsumer.position(storePartition) == endOffset) {
> break;
> }
> {code}
>
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of
> compaction.
> Or there is inconsistency between offsets calculation between 0.10.2.1 and
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)