[
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
linyue li updated KAFKA-7672:
-----------------------------
Description:
Normally, when a task is mitigated to a new thread and no checkpoint file was
found under its task folder, Kafka Stream needs to restore the local state for
remote changelog topic completely and then resume running. However, in some
scenarios, we found that Kafka Stream *NOT* restore this state even no
checkpoint was found, but just clean the state folder and transition to running
state directly, resulting the historic data loss.
To be specific, I will give the detailed logs for Kafka Stream in our project
to show this scenario:
{quote}2018-10-23 08:27:07,684 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
Revoking previously assigned partitions [AuditTrailBatch-0-5]
2018-10-23 08:27:07,684 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
PARTITIONS_REVOKED
2018-10-23 08:27:10,856 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
(Re-)joining group
2018-10-23 08:27:53,153 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
Successfully joined group with generation 323
2018-10-23 08:27:53,153 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
2018-10-23 08:27:53,153 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to
PARTITIONS_ASSIGNED
2018-10-23 08:27:53,153 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
2018-10-23 08:27:53,622 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
2018-10-23 08:27:54,357 INFO
org.apache.kafka.streams.processor.internals.StoreChangelogReader -
stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 1_1
state store AuditTrailBatch-store1-changelog-1 with EOS turned on.*
*Reinitializing the task and restore its state from the beginning.*
2018-10-23 08:27:54,357 INFO
org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting
offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
2018-10-23 08:27:54,653 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to
RUNNING*
{quote}
>From the logs above, we can get the procedure for thread
>AuditTrailBatch-StreamThread-1:
# the previous running task assigned to thread 1 is task 0_5 (the
corresponding partition is AuditTrailBatch-0-5)
# group begins to rebalance, the new task 1_1 is assigned to thread 1.
# no checkpoint was found under 1_1 state folder, so reset the offset to 0 and
clean the local state folder.
# thread 1 transitions to RUNNING state directly without the restoration for
task 1_1, so the historic data for state 1_1 is lost for thread 1.
*ThoubleShoot*
To investigate the cause for this issue, we analysis the source code in
KafkaStream and found the key is the variable named "completedRestorers".
This is the definition of the variable:
{code:java}
private final Set<TopicPartition> completedRestorers = new HashSet<>();{code}
Each thread object has its own completedRestorers, which is created in the
thread initialization, and not accessed crossly by other threads. The
completedRestorers is used to record the partitions that has been restored
completely in the thread.
{code:java}
if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
restorer.restoreDone();
endOffsets.remove(partition);
completedRestorers.add(partition);
}{code}
Once the partition is added to completedRestorers set, it will be returned by
restore() and pass to the next caller updateRestored(), and then the
transitionToRunning() will set this task to running state.
But we found that completedRestorers NEVER be cleared during the life cycle of
this thread, even in the reset function:
{code:java}
@Override
public void reset() {
partitionInfo.clear();
stateRestorers.clear();
needsRestoring.clear();
endOffsets.clear();
needsInitializing.clear();
}
{code}
It will cause a problem: we assume that the task 1 once assigned to thread A,
so its partition has been added to completeRestores. Then it mitigated to
another thread (maybe in an different instance). After several rounds of
rebalancing, it transitioned to thread A again and no checkpoint was here for
some reason. The right way is to clean the state folder and restore it for
beginning, but now, it found this task's partition is already in
completedRestorers list, so it will consider this task as restored completely
and resumed running directly.
To avoid it, we should clean the historical completedRestorers set every time
after reassignment. So I add the clear operation in the reset() and validate it
works.
{code:java}
{code}
was:
Normally, when a task is mitigated to a new thread and no checkpoint file was
found under its task folder, Kafka Stream needs to restore the local state for
remote changelog topic completely and then resume running. However, in some
scenarios, we found that Kafka Stream *NOT* restore this state even no
checkpoint was found, but just clean the state folder and transition to running
state directly, resulting the historic data loss.
To be specific, I will give the detailed logs for Kafka Stream in our project
to show this scenario:
{quote}2018-10-23 08:27:07,684 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
Revoking previously assigned partitions [AuditTrailBatch-0-5]
2018-10-23 08:27:07,684 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
PARTITIONS_REVOKED
2018-10-23 08:27:10,856 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
(Re-)joining group
2018-10-23 08:27:53,153 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
Successfully joined group with generation 323
2018-10-23 08:27:53,153 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
2018-10-23 08:27:53,153 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to
PARTITIONS_ASSIGNED
2018-10-23 08:27:53,153 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
2018-10-23 08:27:53,622 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
2018-10-23 08:27:54,357 INFO
org.apache.kafka.streams.processor.internals.StoreChangelogReader -
stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 1_1
state store AuditTrailBatch-store1-changelog-1 with EOS turned on.*
*Reinitializing the task and restore its state from the beginning.*
2018-10-23 08:27:54,357 INFO
org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting
offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
2018-10-23 08:27:54,653 INFO
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to
RUNNING*
{quote}
>From the logs above, we can get the procedure for thread
>AuditTrailBatch-StreamThread-1:
# the previous running task assigned to thread 1 is task 0_5 (the
corresponding partition is AuditTrailBatch-0-5)
# group begins to rebalance, the new task 1_1 is assigned to thread 1.
# no checkpoint was found under 1_1 state folder, so reset the offset to 0 and
clean the local state folder.
# thread 1 transitions to RUNNING state directly without the restoration for
task 1_1, so the historic data for state 1_1 is lost for thread 1.
*ThoubleShoot*
To investigate the cause for this issue, we analysis the source code in
KafkaStream and found the key is the variable named "completedRestorers".
This is the definition of the variable:
{code:java}
private final Set<TopicPartition> completedRestorers = new HashSet<>();{code}
Each thread object has its own completedRestorers, which is created in the
thread initialization, and not accessed crossly by other threads. The
completedRestorers is used to record the partitions that has been restored
completely in the thread.
{code:java}
if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
restorer.restoreDone();
endOffsets.remove(partition);
completedRestorers.add(partition);
}{code}
Once the partition is added to completedRestorers set, it will be returned by
restore() and pass to the next caller updateRestored(), and then the
transitionToRunning() will set this task to running state.
But we found that completedRestorers NEVER be cleared during the life cycle of
this thread, even in the reset function:
{code:java}
@Override
public void reset() {
partitionInfo.clear();
stateRestorers.clear();
needsRestoring.clear();
endOffsets.clear();
needsInitializing.clear();
}
{code}
It will cause a problem: we assume that the task 1 once assigned to thread A,
so its partition has been added to completeRestores. Then it mitigated to
another thread (maybe in an different instance). After several rounds of
rebalancing, it transitioned to thread A again and no checkpoint was here for
some reason. The right way is to clean the state folder and restore it for
beginning, but now, it found this task's partition is already in
completedRestorers list, so it will consider this task as restored completely
and resumed running directly.
To avoid it, we should clean the historical completedRestorers set every time
after reassignment. So I add the clear operation in the reset() and validate it
works.
{code:java}
@Override
public void reset() {
partitionInfo.clear();
stateRestorers.clear();
needsRestoring.clear();
endOffsets.clear();
needsInitializing.clear();
//add by linyli
completedRestorers.clear();
}{code}
> The local state not fully restored after KafkaStream rebalanced, resulting in
> data loss
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
> Reporter: linyue li
> Priority: Major
> Fix For: 2.1.0
>
>
> Normally, when a task is mitigated to a new thread and no checkpoint file was
> found under its task folder, Kafka Stream needs to restore the local state
> for remote changelog topic completely and then resume running. However, in
> some scenarios, we found that Kafka Stream *NOT* restore this state even no
> checkpoint was found, but just clean the state folder and transition to
> running state directly, resulting the historic data loss.
> To be specific, I will give the detailed logs for Kafka Stream in our project
> to show this scenario:
> {quote}2018-10-23 08:27:07,684 INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch]
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO
> org.apache.kafka.streams.processor.internals.StoreChangelogReader -
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.*
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread
> AuditTrailBatch-StreamThread-1:
> # the previous running task assigned to thread 1 is task 0_5 (the
> corresponding partition is AuditTrailBatch-0-5)
> # group begins to rebalance, the new task 1_1 is assigned to thread 1.
> # no checkpoint was found under 1_1 state folder, so reset the offset to 0
> and clean the local state folder.
> # thread 1 transitions to RUNNING state directly without the restoration for
> task 1_1, so the historic data for state 1_1 is lost for thread 1.
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set<TopicPartition> completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the
> thread initialization, and not accessed crossly by other threads. The
> completedRestorers is used to record the partitions that has been restored
> completely in the thread.
> {code:java}
> if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
> restorer.restoreDone();
> endOffsets.remove(partition);
> completedRestorers.add(partition);
> }{code}
> Once the partition is added to completedRestorers set, it will be returned by
> restore() and pass to the next caller updateRestored(), and then the
> transitionToRunning() will set this task to running state.
> But we found that completedRestorers NEVER be cleared during the life cycle
> of this thread, even in the reset function:
> {code:java}
> @Override
> public void reset() {
> partitionInfo.clear();
> stateRestorers.clear();
> needsRestoring.clear();
> endOffsets.clear();
> needsInitializing.clear();
> }
> {code}
> It will cause a problem: we assume that the task 1 once assigned to thread A,
> so its partition has been added to completeRestores. Then it mitigated to
> another thread (maybe in an different instance). After several rounds of
> rebalancing, it transitioned to thread A again and no checkpoint was here for
> some reason. The right way is to clean the state folder and restore it for
> beginning, but now, it found this task's partition is already in
> completedRestorers list, so it will consider this task as restored completely
> and resumed running directly.
> To avoid it, we should clean the historical completedRestorers set every time
> after reassignment. So I add the clear operation in the reset() and validate
> it works.
> {code:java}
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)