[
https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737510#comment-16737510
]
Patrik Kleindl commented on KAFKA-6767:
---------------------------------------
[~guozhang] Not sure if we are hitting the same issue here in 2.0.0-cp1 but I
got here because of the log message during investigation of a possible data
loss similar to the one described in
https://issues.apache.org/jira/browse/KAFKA-7672
I tried to isolate the logs for one stream thread:
{code:java}
2019-01-03 09:53:38,068 INFO
[org.apache.kafka.streams.processor.internals.StreamThread]
(service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - stream-thread
[service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11] partition
assignment took 150 ms.
current active tasks: [9_7, 10_6, 8_9, 7_11, 4_2, 6_3, 4_6, 9_2, 10_1, 8_3,
6_7, 4_11]
current standby tasks: []
previous active tasks: [6_0, 4_2, 8_1, 6_4, 4_6, 9_2, 10_2, 8_5, 6_8, 9_7]
2019-01-03 09:53:38,601 INFO [org.apache.kafka.clients.FetchSessionHandler]
(service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - [Consumer
clientId=service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11-restore-consumer,
groupId=] Error sending fetch request (sessionId=1513832101, epoch=22) to node
2: org.apache.kafka.common.errors.DisconnectException.
2019-01-03 09:53:38,602 INFO [org.apache.kafka.clients.FetchSessionHandler]
(service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - [Consumer
clientId=service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11-restore-consumer,
groupId=] Error sending fetch request (sessionId=1465062084, epoch=24) to node
3: org.apache.kafka.common.errors.DisconnectException.
2019-01-03 09:53:38,602 INFO [org.apache.kafka.clients.FetchSessionHandler]
(service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - [Consumer
clientId=service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11-restore-consumer,
groupId=] Error sending fetch request (sessionId=1411569981, epoch=28) to node
1: org.apache.kafka.common.errors.DisconnectException.
2019-01-03 09:53:38,765 INFO
[org.apache.kafka.streams.processor.internals.StreamThread]
(service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - stream-thread
[service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11] State transition
from PARTITIONS_ASSIGNED to RUNNING
2019-01-03 09:53:38,788 WARN
[org.apache.kafka.streams.processor.internals.ProcessorStateManager]
(service-81cb160f-70d6-4cd1-b050-d56883f81dae-StreamThread-11) - task [9_7]
Failed to write offset checkpoint file to path/9_7/.checkpoint: {}:
java.io.FileNotFoundException: path/9_7/.checkpoint.tmp (No such file or
directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:315)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:383)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
{code}
The last warning has been repeated every 30 seconds ever since.
> OffsetCheckpoint write assumes parent directory exists
> ------------------------------------------------------
>
> Key: KAFKA-6767
> URL: https://issues.apache.org/jira/browse/KAFKA-6767
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Steven Schlansker
> Priority: Minor
>
> We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an
> instance dies it is created from scratch, rather than reusing the existing
> RocksDB.)
> We routinely see:
> {code:java}
> 2018-04-09T19:14:35.004Z WARN <>
> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1]
> o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset
> checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
> java.io.FileNotFoundException:
> /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or
> directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> at
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> Inspecting the state store directory, I can indeed see that {{chat/0_11}}
> does not exist (although many other partitions do).
>
> Looking at the OffsetCheckpoint write method, it seems to try to open a new
> checkpoint file without first ensuring that the parent directory exists.
>
> {code:java}
> public void write(final Map<TopicPartition, Long> offsets) throws
> IOException {
> // if there is no offsets, skip writing the file to save disk IOs
> if (offsets.isEmpty()) {
> return;
> }
> synchronized (lock) {
> // write to temp file and then swap with the existing file
> final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
>
> Either the OffsetCheckpoint class should initialize the directories if
> needed, or some precondition of it being called should ensure that is the
> case.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)