RivenSun created KAFKA-13829:
--------------------------------
Summary: The function of max.in.flight.requests.per.connection
parameter does not work, it conflicts with the underlying NIO sending data
Key: KAFKA-13829
URL: https://issues.apache.org/jira/browse/KAFKA-13829
Project: Kafka
Issue Type: Bug
Components: clients
Reporter: RivenSun
Due to the implementation mechanism of the `OP_WRITE` event of Kafka's
underlying NIO, the function of the `max.in.flight.requests.per.connection`
parameter does not work. This will greatly affect Kafka's network sending
performance.
The process of Kafka's Selector sending ClientRequest can be simply divided
into the following two major steps:
h2.
1) Prepare the request data to be sent
1. NetworkClient.ready(Node node, long now) ->
NetworkClient.canSendRequest(...) method is called: determine whether the
request is eligible to be sent.
2.NetworkClient.doSend(...): Construct `Send send` and cache the request by
inFlightRequests.add(inFlightRequest).
3. Execute the `KafkaChannel.setSend()` method:
Judging that `this.send` *must be null* at present, otherwise {*}an
IllegalStateException is thrown{*};
Set `this.send` value;
transportLayer adds `OP_WRITE` event.
h2. 2) Selector.poll(long timeout) should then be called to consume the send
and send data to the network.
{code:java}
Selector.poll() -> this.nioSelector.select(timeout)
Selector.pollSelectionKeys() ->
Selector.attemptWrite() ->
Selector.write(channel) ->
KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code}
1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously
registered `OP_WRITE` event
2. Execute the `Selector.attemptWrite()` method
3. In the `KafkaChannel.write()` method, after the data is successfully sent,
update `this.send` to the completed state.
4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in
the completed state, otherwise do nothing.
5. If send is completed, transportLayer removes the `OP_WRITE` event and resets
the KafkaChannel.send object to null.
6. Wait for the next request data that is ready to be sent.
It seems that there is no problem with the whole process above, but carefully
read the method of NetworkClient.canSendRequest(...), there is such a condition
in inFlightRequests.canSendMore(node):
{code:java}
queue.peekFirst().send.completed(){code}
Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls
*addFirst(request).*
Currently, *only one send object* is stored in KafkaChannel, not a sendObject
{*}collection{*};
During OP_WRITE event registration and removal, *only* *one* *send* *object*
*will be sent* in the KafkaChannel.write() method and *only one send* *object*
*will be completed* in the KafkaChannel.maybeCompleteSend() method.
So whether the clientRequest is eligible to be sent will {color:#FF0000}*only
be limited by queue.peekFirst().send.completed()*{color}, the
{color:#FF0000}*max.in.flight.requests.per.connection*{color} parameter will
lose its effect, and {*}{color:#FF0000}the effect of setting greater than 1 is
equivalent in 1{color}{*}.
h2.
Suggest
Due to the KafkaClient architecture, we do not need to consider the concurrent
execution of multiple threads of the `NetworkClient.poll` method.
1.NetworkClient.canSendRequest(...) removes the condition,:
{code:java}
queue.peekFirst().send.completed(){code}
`max.in.flight.requests.per.connection` parameter will work again.
2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in
KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
It may no longer be appropriate now. Because we want to send more than one send
object data during the registration and removal of `OP_WRITE` events, it is
recommended {*}not to repeatedly register and remove `OP_WRITE` events{*}, but
choose to {*}register the `OP_WRITE` event in the
`transportLayer.finishConnect()` method{*}:
{code:java}
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT |
SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code}
3. The KafkaChannel.setSend() method does not only cache *an* incoming send
object, but the incoming send object is stored in the *sendCollection*
structure.
4. When KafkaChannel.write() is executed, copy sendCollection to
{*}midWriteSendCollection{*}, then clear sendCollection, and finally send all
the data in {*}midWriteSendCollection{*}.
5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is a
completed state send in the midWriteSendCollection, and then remove all
completedSends from the midWriteSendCollection, and return completedSends for
adding into Selector.completedSends.
6. Selector.attemptRead(channel) method execution has preconditions:
`{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object does
not need to be changed.
7. A little extra:
ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean
disableWakeup) method.
There are three places involved in sending data in this method:
{code:java}
long pollDelayMs = trySend(timer.currentTimeMs());
->
client.poll(...)
->
trySend(timer.currentTimeMs());{code}
There is a problem with this process:
when calling the trySend(...) method for the second time, we should immediately
call client.poll(0, timer.currentTimeMs()); , to ensure that the send generated
each time can be consumed by the next Selector.poll() method.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)