[
https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexandre Dupriez updated KAFKA-14845:
--------------------------------------
Description:
Our production environment faced a use case where registration of a broker
failed due to the presence of a "conflicting" broker znode in Zookeeper. This
case is not without familiarity to that fixed by KAFKA-6584 and induced by the
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
A network partition disturbed communication channels between the Kafka and
Zookeeper clusters for about 20% of the brokers in the cluster. One of this
broker was not able to re-register with Zookeeper and was excluded from the
cluster until it was restarted. Broker logs show the failed registration due to
a "conflicting" znode write which in this case does not exactly match the
scenario covered by KAFKA-6584.
The sequence of logs on the broker is as follows.
First, a connection is established with the Zookeeper node 3.
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40,
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182]
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
An existing Zookeeper session was expired, and upon reconnection, the Zookeeper
state change handler was invoked. The creation of the ephemeral znode
/brokers/ids/18 started on the controller thread.
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false)
(kafka.zk.KafkaZkClient){code}
The client "session" timed out after 6 seconds. Note the session is 0x0 and the
absence of "{_}Session establishment complete{_}" log: the broker appears to
have never received or processed the response from the Zookeeper node.
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from
server in 6000ms for sessionid 0x0, closing socket connection and attempting
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40,
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182]
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client
started waiting on a new connection notification.
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until
connected. (kafka.zookeeper.ZooKeeperClient){code}
A new connection was created with the Zookeeper node 1. Note that a valid (new)
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106,
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182]
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000
(org.apache.zookeeper.ClientCnxn){code}
The Kafka ZK client is notified of the connection.
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected.
(kafka.zookeeper.ZooKeeperClient){code}
The broker sends the request to create the znode {{/brokers/ids/18}} which
already exists. The error path implemented for KAFKA-6584 is then followed.
However, in this case, the session owning the ephemeral node
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last
active Zookeeper session which the broker has recorded. And it is also
different from the current session {{0x1006c6e0b830001}}
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not
attempted.
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at
/brokers/ids/18, node already exists and owner '216172783240153793' does not
match current session '72176813933264897'
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
at
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
at
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
at
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
at kafka.controller.KafkaController.process(KafkaController.scala:1853)
at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
at
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
at
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper
server logs:
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
{code}
The ephemeral znode is then deleted and never recreated. The broker is not
registered. Only a broker restart (or forced recreation of the Zookeeper
session) can mitigate at this point.
An analysis of the commit logs from Zookeeper does show the following sequence
of transactions with timestamps from the Zookeeper node.
- 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
- 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18),
SetData(<BrokerInfo>)]
- 2023-03-05T16:02:21.336Z: CloseSession [-11]
The fix for KAFKA-6584 does not cover this case because here, the session ID is
not surfaced and recorded by the ZK client in Kafka (there was no lower-level
logs to ascertain if the Netty client ever received any response for that
session).
As a remediation, perhaps the source of identity of the broker (currently
conveyed by the Zookeeper session ID) could be explicitly added to the znode
data (assuming the Zookeeper Multi is atomic, the znode must have the
BrokerInfo (or any other user data provided with the SetData command) if and
only if it is successfully created).
----
*Update:* this can be reproduced with this [automated
test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg].
The sequence of events produced by the test is the following.
!phoque.png!
*(1)* The Zookeeper client is created by the application. It opens a TCP
connection, then send a Connect request which is processed on the Netty NIO
thread pool. A CreateSession request is internally enqueued to be handled by
the synchronous request processor. Once processed, a session id is generated
and recorded, and that session id 0x1001764b3920000 is returned to the client.
{code:java}
[2023-03-29 16:22:04,258] INFO Socket connection established, initiating
session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b,
L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 txntype:-10
reqpath:n/a
[2023-03-29 16:22:04,310] INFO Session establishment complete on server
localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout =
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected.
(kafka.zookeeper.ZooKeeperClient){code}
*(2)* The Kafka client on top of the Zookeeper client is used to register the
broker. The multiTransaction API in the Zookeeper client is invoked and a multi
request is sent to Zookeeper, with a CreateNode and SetData transactions.
{code:java}
[2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false)
(kafka.zk.KafkaZkClient)
sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14
reqpath:n/a
[2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 is:
3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265
(kafka.zk.KafkaZkClient)
[2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18
with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265
(kafka.zk.KafkaZkClient){code}
*(3)* The client generates Ping request every 6 seconds (read timeout / 2). The
response to these pings are not sent back to the client (enforced by the test,
see the "Dropping" keyword in stdout). After 12 seconds (read timeout), the
client initiates a new connection.
{code:java}
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from
server in 12001ms for session id 0x1001764b3920000
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server
localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect
except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b,
L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
*(4)* A new TCP connection is opened and the Connect request sent. A delay is
artificially injected to make the connection request time out (connection
timeout, which is equal to the session timeout in this case).
{code:java}
[2023-03-29 16:22:18,388] INFO Opening socket connection to server
localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,395] INFO Socket connection established, initiating
session, client: /[0:0:0:0:0:0:0:1]:50121, server:
localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08,
L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
>>>> CONNECTION DELAY = 18500 ms{code}
*(5)* After 18 seconds (which is both the session and connection timeout), the
server expires the session and an internal closeSession is generated on the
server and enqueued to be processed by the synchronous request processor.
{code:java}
[2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11
reqpath:n/a
[2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from
server in 18005ms for session id 0x1001764b3920000
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server
localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting
reconnect except it is a SessionExpiredException.
(org.apache.zookeeper.ClientCnxn){code}
*(6)* In parallel, the client detects the connection timeout and reconnects. A
new TCP connection is opened, the client is notified by the server that the
session 0x10016b7f51c0000 has expired.
{code:java}
[2023-03-29 16:22:37,972] INFO Opening socket connection to server
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO Socket connection established, initiating
session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7,
L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client
/127.0.0.1:50131, probably expired (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service,
session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
7) The client closes its "handle" (~client in Zookeeper terminology) with
Zookeeper.
{code:java}
[2023-03-29 16:22:37,994] INFO EventThread shut down for session:
0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired.
(kafka.zookeeper.ZooKeeperClient)
[2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new
session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
8) The client creates a new handle which initiates a new session with
Zookeeper. A new TCP connection is opened, and the createSession is propagated
to the synchronous request processor. The session is created by the server, but
the response never sent back to the client.
{code:java}
[2023-03-29 16:22:37,998] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=18000
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e
(org.apache.zookeeper.ZooKeeper)
[2023-03-29 16:22:37,999] INFO Opening socket connection to server
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,005] INFO Socket connection established, initiating
session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4,
L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
[ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 zxid:0xcc4
txntype:-10 reqpath:n/a
Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
9) An inter-thread signal triggered during the creation of the second session
on the synchronous request processor is captured by the main (application)
thread which starts a broker registration. The client immediately sends the
multi request to the server using the opened TCP connection even before
receiving confirmation of the successful processing of the ConnectRequest, and
without knowing the id of the current session. The server processes the multi
request and creates the znode /brokers/ids/18 using the session
0x1001764b3920001. The response is not sent to the client, which is not made
aware the request has been processed.
{code:java}
[2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false)
(kafka.zk.KafkaZkClient)
[ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5
txntype:14 reqpath:n/a
Dropping 1,3269,0
org.apache.zookeeper.MultiResponse@99bc5055{code}
10) The connection times out again, since no response is sent to the client
within 18 seconds. The Zookeeper client library returns a CONNLOSS response to
the multi request. The Zookeeper client built on top of it in Kafka retries on
this type of error. So, it retries, waiting for a new connection to be
established.
{code:java}
[2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from
server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:56,014] WARN Session 0x0 for server localhost/127.0.0.1:2181,
Closing socket connection. Attempting reconnect except it is a
SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code}
11) A new connection attempt is established. This time, we allow it to go
through.
{code:java}
[2023-03-29 16:22:57,490] INFO Opening socket connection to server
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,494] INFO Socket connection established, initiating
session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad,
L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 txntype:-10
reqpath:n/a
[2023-03-29 16:22:57,522] INFO Session establishment complete on server
localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout =
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected.
(kafka.zookeeper.ZooKeeperClient){code}
12) The multi request is retried and the response NODEEXISTS is received. Kafka
then sends a getData to Zookeeper to find the ephemeral owner of the znode. The
ephemeral owner is 0x1001764b3920001 (72082573385007105) which matches neither
the current session 0x1001764b3920002 (72083315314786306) nor the previously
recorded one in the Kafka client (0x1001764b3920000) during the first znode
creation.
{code:java}
sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14
reqpath:n/a
sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe
txntype:unknown reqpath:/brokers/ids/18
[2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at
/brokers/ids/18, node already exists and owner '72083315314786305' does not
match current session '72083315314786306'
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
{code}
Note that on the diagram, we introduce a delay in the expiration of the session
0x1001764b3920001. The expiration of the session, and deletion of its
associated ephemeral nodes, is scheduled on the synchronous processor
asynchronously of incoming requests, such that it is possible for the multi
request to come and be consumed by the synchronous processor before the
closeSession request for 0x1001764b3920001. The ephemeral znode for that
session is therefore still present in Zookeeper's data tree when the new multi
request is processed.
was:
Our production environment faced a use case where registration of a broker
failed due to the presence of a "conflicting" broker znode in Zookeeper. This
case is not without familiarity to that fixed by KAFKA-6584 and induced by the
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
A network partition disturbed communication channels between the Kafka and
Zookeeper clusters for about 20% of the brokers in the cluster. One of this
broker was not able to re-register with Zookeeper and was excluded from the
cluster until it was restarted. Broker logs show the failed registration due to
a "conflicting" znode write which in this case does not exactly match the
scenario covered by KAFKA-6584.
The sequence of logs on the broker is as follows.
First, a connection is established with the Zookeeper node 3.
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40,
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182]
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
An existing Zookeeper session was expired, and upon reconnection, the Zookeeper
state change handler was invoked. The creation of the ephemeral znode
/brokers/ids/18 started on the controller thread.
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false)
(kafka.zk.KafkaZkClient){code}
The client "session" timed out after 6 seconds. Note the session is 0x0 and the
absence of "{_}Session establishment complete{_}" log: the broker appears to
have never received or processed the response from the Zookeeper node.
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from
server in 6000ms for sessionid 0x0, closing socket connection and attempting
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40,
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182]
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client
started waiting on a new connection notification.
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until
connected. (kafka.zookeeper.ZooKeeperClient){code}
A new connection was created with the Zookeeper node 1. Note that a valid (new)
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106,
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182]
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000
(org.apache.zookeeper.ClientCnxn){code}
The Kafka ZK client is notified of the connection.
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected.
(kafka.zookeeper.ZooKeeperClient){code}
The broker sends the request to create the znode {{/brokers/ids/18}} which
already exists. The error path implemented for KAFKA-6584 is then followed.
However, in this case, the session owning the ephemeral node
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last
active Zookeeper session which the broker has recorded. And it is also
different from the current session {{0x1006c6e0b830001}}
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not
attempted.
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at
/brokers/ids/18, node already exists and owner '216172783240153793' does not
match current session '72176813933264897'
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
at
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
at
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
at
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
at kafka.controller.KafkaController.process(KafkaController.scala:1853)
at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
at
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
at
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper
server logs:
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
{code}
The ephemeral znode is then deleted and never recreated. The broker is not
registered. Only a broker restart (or forced recreation of the Zookeeper
session) can mitigate at this point.
An analysis of the commit logs from Zookeeper does show the following sequence
of transactions with timestamps from the Zookeeper node.
- 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
- 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18),
SetData(<BrokerInfo>)]
- 2023-03-05T16:02:21.336Z: CloseSession [-11]
The fix for KAFKA-6584 does not cover this case because here, the session ID is
not surfaced and recorded by the ZK client in Kafka (there was no lower-level
logs to ascertain if the Netty client ever received any response for that
session).
As a remediation, perhaps the source of identity of the broker (currently
conveyed by the Zookeeper session ID) could be explicitly added to the znode
data (assuming the Zookeeper Multi is atomic, the znode must have the
BrokerInfo (or any other user data provided with the SetData command) if and
only if it is successfully created).
----
*Update:* this can be reproduced with this [automated
test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg].
The sequence of events produced by the test is the following.
!phoque.png!
1) The Zookeeper client is created by the application. It opens a TCP
connection, then send a Connect request which is processed on the Netty NIO
thread pool. A CreateSession request is internally enqueued to be handled by
the synchronous request processor. Once processed, a session id is generated
and recorded, and that session id 0x1001764b3920000 is returned to the client.
{code:java}
[2023-03-29 16:22:04,258] INFO Socket connection established, initiating
session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b,
L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 txntype:-10
reqpath:n/a
[2023-03-29 16:22:04,310] INFO Session establishment complete on server
localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout =
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected.
(kafka.zookeeper.ZooKeeperClient){code}
2) The Kafka client on top of the Zookeeper client is used to register the
broker. The multiTransaction API in the Zookeeper client is invoked and a multi
request is sent to Zookeeper, with a CreateNode and SetData transactions.
{code:java}
[2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false)
(kafka.zk.KafkaZkClient)
sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14
reqpath:n/a
[2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 is:
3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265
(kafka.zk.KafkaZkClient)
[2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18
with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265
(kafka.zk.KafkaZkClient){code}
3) The client generates Ping request every 6 seconds (read timeout / 2). The
response to these pings are not sent back to the client (enforced by the test,
see the "Dropping" keyword in stdout). After 12 seconds (read timeout), the
client initiates a new connection.
{code:java}
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from
server in 12001ms for session id 0x1001764b3920000
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server
localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect
except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b,
L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
4) A new TCP connection is opened and the Connect request sent. A delay is
artificially injected to make the connection request time out (connection
timeout, which is equal to the session timeout in this case).
{code:java}
[2023-03-29 16:22:18,388] INFO Opening socket connection to server
localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,395] INFO Socket connection established, initiating
session, client: /[0:0:0:0:0:0:0:1]:50121, server:
localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08,
L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
>>>> CONNECTION DELAY = 18500 ms{code}
5) After 18 seconds (which is both the session and connection timeout), the
server expires the session and an internal closeSession is generated on the
server and enqueued to be processed by the synchronous request processor.
{code:java}
[2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11
reqpath:n/a
[2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from
server in 18005ms for session id 0x1001764b3920000
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server
localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting
reconnect except it is a SessionExpiredException.
(org.apache.zookeeper.ClientCnxn){code}
6) In parallel, the client detects the connection timeout and reconnects. A new
TCP connection is opened, the client is notified by the server that the session
0x10016b7f51c0000 has expired.
{code:java}
[2023-03-29 16:22:37,972] INFO Opening socket connection to server
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO Socket connection established, initiating
session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7,
L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client
/127.0.0.1:50131, probably expired (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service,
session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
7) The client closes its "handle" (~client in Zookeeper terminology) with
Zookeeper.
{code:java}
[2023-03-29 16:22:37,994] INFO EventThread shut down for session:
0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired.
(kafka.zookeeper.ZooKeeperClient)
[2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new
session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
8) The client creates a new handle which initiates a new session with
Zookeeper. A new TCP connection is opened, and the createSession is propagated
to the synchronous request processor. The session is created by the server, but
the response never sent back to the client.
{code:java}
[2023-03-29 16:22:37,998] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=18000
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e
(org.apache.zookeeper.ZooKeeper)
[2023-03-29 16:22:37,999] INFO Opening socket connection to server
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,005] INFO Socket connection established, initiating
session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4,
L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
[ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 zxid:0xcc4
txntype:-10 reqpath:n/a
Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
9) An inter-thread signal triggered during the creation of the second session
on the synchronous request processor is captured by the main (application)
thread which starts a broker registration. The client immediately sends the
multi request to the server using the opened TCP connection even before
receiving confirmation of the successful processing of the ConnectRequest, and
without knowing the id of the current session. The server processes the multi
request and creates the znode /brokers/ids/18 using the session
0x1001764b3920001. The response is not sent to the client, which is not made
aware the request has been processed.
{code:java}
[2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false)
(kafka.zk.KafkaZkClient)
[ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5
txntype:14 reqpath:n/a
Dropping 1,3269,0
org.apache.zookeeper.MultiResponse@99bc5055{code}
10) The connection times out again, since no response is sent to the client
within 18 seconds. The Zookeeper client library returns a CONNLOSS response to
the multi request. The Zookeeper client built on top of it in Kafka retries on
this type of error. So, it retries, waiting for a new connection to be
established.
{code:java}
[2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from
server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:56,014] WARN Session 0x0 for server localhost/127.0.0.1:2181,
Closing socket connection. Attempting reconnect except it is a
SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code}
11) A new connection attempt is established. This time, we allow it to go
through.
{code:java}
[2023-03-29 16:22:57,490] INFO Opening socket connection to server
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,494] INFO Socket connection established, initiating
session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad,
L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181]
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 txntype:-10
reqpath:n/a
[2023-03-29 16:22:57,522] INFO Session establishment complete on server
localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout =
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected.
(kafka.zookeeper.ZooKeeperClient){code}
12) The multi request is retried and the response NODEEXISTS is received. Kafka
then sends a getData to Zookeeper to find the ephemeral owner of the znode. The
ephemeral owner is 0x1001764b3920001 (72082573385007105) which matches neither
the current session 0x1001764b3920002 (72083315314786306) nor the previously
recorded one in the Kafka client (0x1001764b3920000) during the first znode
creation.
{code:java}
sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14
reqpath:n/a
sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe
txntype:unknown reqpath:/brokers/ids/18
[2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at
/brokers/ids/18, node already exists and owner '72083315314786305' does not
match current session '72083315314786306'
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
{code}
Note that on the diagram, we introduce a delay in the expiration of the session
0x1001764b3920001. The expiration of the session, and deletion of its
associated ephemeral nodes, is scheduled on the synchronous processor
asynchronously of incoming requests, such that it is possible for the multi
request to come and be consumed by the synchronous processor before the
closeSession request for 0x1001764b3920001. The ephemeral znode for that
session is therefore still present in Zookeeper's data tree when the new multi
request is processed.
> Broker ZNode creation can fail due to lost Zookeeper Session ID
> ---------------------------------------------------------------
>
> Key: KAFKA-14845
> URL: https://issues.apache.org/jira/browse/KAFKA-14845
> Project: Kafka
> Issue Type: Bug
> Reporter: Alexandre Dupriez
> Assignee: Alexandre Dupriez
> Priority: Minor
> Attachments: kafka-broker-reg.log, phoque.png
>
>
> Our production environment faced a use case where registration of a broker
> failed due to the presence of a "conflicting" broker znode in Zookeeper. This
> case is not without familiarity to that fixed by KAFKA-6584 and induced by
> the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
> A network partition disturbed communication channels between the Kafka and
> Zookeeper clusters for about 20% of the brokers in the cluster. One of this
> broker was not able to re-register with Zookeeper and was excluded from the
> cluster until it was restarted. Broker logs show the failed registration due
> to a "conflicting" znode write which in this case does not exactly match the
> scenario covered by KAFKA-6584.
> The sequence of logs on the broker is as follows.
> First, a connection is established with the Zookeeper node 3.
> {code:java}
> [2023-03-05 16:01:55,342] INFO Socket connection established, initiating
> session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40,
> L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182]
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> An existing Zookeeper session was expired, and upon reconnection, the
> Zookeeper state change handler was invoked. The creation of the ephemeral
> znode /brokers/ids/18 started on the controller thread.
> {code:java}
> [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false)
> (kafka.zk.KafkaZkClient){code}
> The client "session" timed out after 6 seconds. Note the session is 0x0 and
> the absence of "{_}Session establishment complete{_}" log: the broker appears
> to have never received or processed the response from the Zookeeper node.
> {code:java}
> [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from
> server in 6000ms for sessionid 0x0, closing socket connection and attempting
> reconnect (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40,
> L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182]
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client
> started waiting on a new connection notification.
> {code:java}
> [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until
> connected. (kafka.zookeeper.ZooKeeperClient){code}
> A new connection was created with the Zookeeper node 1. Note that a valid
> (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
> {code:java}
> [2023-03-05 16:02:02,037] INFO Socket connection established, initiating
> session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106,
> L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182]
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-05 16:02:03,054] INFO Session establishment complete on server
> zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout =
> 18000 (org.apache.zookeeper.ClientCnxn){code}
> The Kafka ZK client is notified of the connection.
> {code:java}
> [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected.
> (kafka.zookeeper.ZooKeeperClient){code}
> The broker sends the request to create the znode {{/brokers/ids/18}} which
> already exists. The error path implemented for KAFKA-6584 is then followed.
> However, in this case, the session owning the ephemeral node
> {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last
> active Zookeeper session which the broker has recorded. And it is also
> different from the current session {{0x1006c6e0b830001}}
> ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not
> attempted.
> {code:java}
> [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at
> /brokers/ids/18, node already exists and owner '216172783240153793' does not
> match current session '72176813933264897'
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
> NodeExists
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
> at
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
> at
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
> at
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
> at
> kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
> at
> kafka.controller.KafkaController.process(KafkaController.scala:1853)
> at
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
> at
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
> at
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
> at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
> The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper
> server logs:
> {code:java}
> [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for
> session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
> {code}
> The ephemeral znode is then deleted and never recreated. The broker is not
> registered. Only a broker restart (or forced recreation of the Zookeeper
> session) can mitigate at this point.
> An analysis of the commit logs from Zookeeper does show the following
> sequence of transactions with timestamps from the Zookeeper node.
> - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
> - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18),
> SetData(<BrokerInfo>)]
> - 2023-03-05T16:02:21.336Z: CloseSession [-11]
> The fix for KAFKA-6584 does not cover this case because here, the session ID
> is not surfaced and recorded by the ZK client in Kafka (there was no
> lower-level logs to ascertain if the Netty client ever received any response
> for that session).
> As a remediation, perhaps the source of identity of the broker (currently
> conveyed by the Zookeeper session ID) could be explicitly added to the znode
> data (assuming the Zookeeper Multi is atomic, the znode must have the
> BrokerInfo (or any other user data provided with the SetData command) if and
> only if it is successfully created).
> ----
> *Update:* this can be reproduced with this [automated
> test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg].
> The sequence of events produced by the test is the following.
> !phoque.png!
> *(1)* The Zookeeper client is created by the application. It opens a TCP
> connection, then send a Connect request which is processed on the Netty NIO
> thread pool. A CreateSession request is internally enqueued to be handled by
> the synchronous request processor. Once processed, a session id is generated
> and recorded, and that session id 0x1001764b3920000 is returned to the client.
> {code:java}
> [2023-03-29 16:22:04,258] INFO Socket connection established, initiating
> session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b,
> L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181]
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0
> txntype:-10 reqpath:n/a
> [2023-03-29 16:22:04,310] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout
> = 18000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected.
> (kafka.zookeeper.ZooKeeperClient){code}
> *(2)* The Kafka client on top of the Zookeeper client is used to register the
> broker. The multiTransaction API in the Zookeeper client is invoked and a
> multi request is sent to Zookeeper, with a CreateNode and SetData
> transactions.
> {code:java}
> [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false)
> (kafka.zk.KafkaZkClient)
> sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14
> reqpath:n/a
> [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18
> is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265
> (kafka.zk.KafkaZkClient)
> [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18
> with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265
> (kafka.zk.KafkaZkClient){code}
> *(3)* The client generates Ping request every 6 seconds (read timeout / 2).
> The response to these pings are not sent back to the client (enforced by the
> test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout),
> the client initiates a new connection.
> {code:java}
> [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe
> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
> Dropping -2,3265,0
> [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe
> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
> Dropping -2,3265,0
> [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from
> server in 12001ms for session id 0x1001764b3920000
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server
> localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect
> except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b,
> L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181]
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> *(4)* A new TCP connection is opened and the Connect request sent. A delay is
> artificially injected to make the connection request time out (connection
> timeout, which is equal to the session timeout in this case).
> {code:java}
> [2023-03-29 16:22:18,388] INFO Opening socket connection to server
> localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:18,395] INFO Socket connection established, initiating
> session, client: /[0:0:0:0:0:0:0:1]:50121, server:
> localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08,
> L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181]
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> >>>> CONNECTION DELAY = 18500 ms{code}
> *(5)* After 18 seconds (which is both the session and connection timeout),
> the server expires the session and an internal closeSession is generated on
> the server and enqueued to be processed by the synchronous request processor.
> {code:java}
> [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11
> reqpath:n/a
> [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from
> server in 18005ms for session id 0x1001764b3920000
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server
> localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting
> reconnect except it is a SessionExpiredException.
> (org.apache.zookeeper.ClientCnxn){code}
> *(6)* In parallel, the client detects the connection timeout and reconnects.
> A new TCP connection is opened, the client is notified by the server that the
> session 0x10016b7f51c0000 has expired.
> {code:java}
> [2023-03-29 16:22:37,972] INFO Opening socket connection to server
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,976] INFO Socket connection established, initiating
> session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7,
> L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181]
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client
> /127.0.0.1:50131, probably expired
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service,
> session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
> 7) The client closes its "handle" (~client in Zookeeper terminology) with
> Zookeeper.
> {code:java}
> [2023-03-29 16:22:37,994] INFO EventThread shut down for session:
> 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired.
> (kafka.zookeeper.ZooKeeperClient)
> [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new
> session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
> 8) The client creates a new handle which initiates a new session with
> Zookeeper. A new TCP connection is opened, and the createSession is
> propagated to the synchronous request processor. The session is created by
> the server, but the response never sent back to the client.
> {code:java}
> [2023-03-29 16:22:37,998] INFO Initiating client connection,
> connectString=localhost:2181 sessionTimeout=18000
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e
> (org.apache.zookeeper.ZooKeeper)
> [2023-03-29 16:22:37,999] INFO Opening socket connection to server
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:38,005] INFO Socket connection established, initiating
> session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4,
> L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181]
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0
> zxid:0xcc4 txntype:-10 reqpath:n/a
> Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
> 9) An inter-thread signal triggered during the creation of the second session
> on the synchronous request processor is captured by the main (application)
> thread which starts a broker registration. The client immediately sends the
> multi request to the server using the opened TCP connection even before
> receiving confirmation of the successful processing of the ConnectRequest,
> and without knowing the id of the current session. The server processes the
> multi request and creates the znode /brokers/ids/18 using the session
> 0x1001764b3920001. The response is not sent to the client, which is not made
> aware the request has been processed.
> {code:java}
> [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false)
> (kafka.zk.KafkaZkClient)
> [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5
> txntype:14 reqpath:n/a
> Dropping 1,3269,0
> org.apache.zookeeper.MultiResponse@99bc5055{code}
> 10) The connection times out again, since no response is sent to the client
> within 18 seconds. The Zookeeper client library returns a CONNLOSS response
> to the multi request. The Zookeeper client built on top of it in Kafka
> retries on this type of error. So, it retries, waiting for a new connection
> to be established.
> {code:java}
> [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from
> server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:56,014] WARN Session 0x0 for server
> localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect
> except it is a SessionExpiredException.
> (org.apache.zookeeper.ClientCnxn){code}
> 11) A new connection attempt is established. This time, we allow it to go
> through.
> {code:java}
> [2023-03-29 16:22:57,490] INFO Opening socket connection to server
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,494] INFO Socket connection established, initiating
> session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad,
> L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181]
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6
> txntype:-10 reqpath:n/a
> [2023-03-29 16:22:57,522] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout
> = 18000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected.
> (kafka.zookeeper.ZooKeeperClient){code}
> 12) The multi request is retried and the response NODEEXISTS is received.
> Kafka then sends a getData to Zookeeper to find the ephemeral owner of the
> znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which
> matches neither the current session 0x1001764b3920002 (72083315314786306) nor
> the previously recorded one in the Kafka client (0x1001764b3920000) during
> the first znode creation.
> {code:java}
> sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14
> reqpath:n/a
> sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe
> txntype:unknown reqpath:/brokers/ids/18
> [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at
> /brokers/ids/18, node already exists and owner '72083315314786305' does not
> match current session '72083315314786306'
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode =
> NodeExists
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
> at
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
> at
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
> at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
> at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
> {code}
> Note that on the diagram, we introduce a delay in the expiration of the
> session 0x1001764b3920001. The expiration of the session, and deletion of its
> associated ephemeral nodes, is scheduled on the synchronous processor
> asynchronously of incoming requests, such that it is possible for the multi
> request to come and be consumed by the synchronous processor before the
> closeSession request for 0x1001764b3920001. The ephemeral znode for that
> session is therefore still present in Zookeeper's data tree when the new
> multi request is processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)