[
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismael Juma updated KAFKA-7464:
-------------------------------
Fix Version/s: 2.1.0
> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --------------------------------------------------------------
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
> Issue Type: Bug
> Reporter: Zhanxiang (Patrick) Huang
> Assignee: Zhanxiang (Patrick) Huang
> Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> ~[?:1.8.0_121]
> at
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
> ~[kafka-clients-2.0.0.22.jar:?]
> at
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806)
> ~[kafka-clients-2.0.0.22.jar:?]
> at
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107)
> ~[kafka-clients-2.0.0.22.jar:?]
> at
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751)
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739)
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701)
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315)
> ~[kafka-clients-2.0.0.22.jar:?]
> at
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595)
> ~[kafka-clients-2.0.0.22.jar:?]
> at
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> ~[scala-library-2.11.12.jar:?]
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> ~[scala-library-2.11.12.jar:?]
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> ~[scala-library-2.11.12.jar:?]
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> ~[scala-library-2.11.12.jar:?]
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> ~[scala-library-2.11.12.jar:?]
> at
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616)
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel]
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
> ~[?:1.8.0_121]
> at
> org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244)
> ~[kafka-clients-2.0.0.22.jar:?]
> at
> org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206)
> ~[kafka-clients-2.0.0.22.jar:?]
> at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at scala.Option.foreach(Option.scala:257)
> ~[scala-library-2.11.12.jar:?]
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1493)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1479)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log.maybeHandleIOException(Log.scala:1856)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log.roll(Log.scala:1479) ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1465)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:762)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log.maybeHandleIOException(Log.scala:1856)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log.append(Log.scala:762) ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.log.Log.appendAsFollower(Log.scala:743)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.cluster.Partition$$anonfun$doAppendRecordsToFollowerOrFutureReplica$1.apply(Partition.scala:601)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:588)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:608)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:43)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:188)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at scala.Option.foreach(Option.scala:257)
> ~[scala-library-2.11.12.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> ~[scala-library-2.11.12.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:115)
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> ~[kafka_2.11-2.0.0.22.jar:?]{noformat}
> Worse more, we found out that if there is a exception thrown in
> ReplicaFetcherManager shutdown, we basically will skip purgatory shutdown and
> HW checkpoint and in our case we didn't see the "Shut down completely" log:
> {code:java}
> def shutdown(checkpointHW: Boolean = true) {
> info("Shutting down")
> removeMetrics()
> if (logDirFailureHandler != null)
> logDirFailureHandler.shutdown()
> replicaFetcherManager.shutdown()
> replicaAlterLogDirsManager.shutdown()
> delayedFetchPurgatory.shutdown()
> delayedProducePurgatory.shutdown()
> delayedDeleteRecordsPurgatory.shutdown()
> if (checkpointHW)
> checkpointHighWatermarks()
> info("Shut down completely")
> }
> {code}
> The reason why we see this is that after KAFKA-6051, we close leaderEndPoint
> in replica fetcher thread initiateShutdown to try to preempt in-progress
> fetch request and accelerate repica fetcher thread shutdown. However,
> leaderEndpoint can throw an Exception when the replica fetcher thread is
> still actively fetching.
>
> I am wondering whether we should try to catch the exception thrown in
> "leaderEndpoint.close()" instead of letting it throw up in the call stack. In
> my opinion, it is safe to do so because ReplicaFetcherThread.initiateShutdown
> will be called when:
> # Server shutdown – In this case we will shut down the process anyway so
> even though we fail to close leader enpoint cleanly there is no harm.
> # shutdownIdleFetcherThread – In this case the fetcher thread is idle and we
> will not use it again anyway so there is no harm either.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)