[
https://issues.apache.org/jira/browse/KAFKA-13829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522136#comment-17522136
]
RivenSun commented on KAFKA-13829:
----------------------------------
Hi [~guozhang] [~showuon] and [~hachikuji]
Could you give some suggestions for this issue?
Thanks.
> The function of max.in.flight.requests.per.connection parameter does not
> work, it conflicts with the underlying NIO sending data
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13829
> URL: https://issues.apache.org/jira/browse/KAFKA-13829
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Reporter: RivenSun
> Priority: Major
>
> Due to the implementation mechanism of the `OP_WRITE` event of Kafka's
> underlying NIO, the function of the `max.in.flight.requests.per.connection`
> parameter does not work. This will greatly affect Kafka's network sending
> performance.
> The process of Kafka's Selector sending ClientRequest can be simply divided
> into the following two major steps:
> h2. 1) Prepare the request data to be sent
> 1. NetworkClient.ready(Node node, long now) ->
> NetworkClient.canSendRequest(...) method is called: determine whether the
> request is eligible to be sent.
> 2.NetworkClient.doSend(...): Construct `Send send` and cache the request by
> inFlightRequests.add(inFlightRequest).
> 3. Execute the `KafkaChannel.setSend()` method:
> Judging that `this.send` *must be null* at present, otherwise {*}an
> IllegalStateException is thrown{*};
> Set `this.send` value;
> transportLayer adds `OP_WRITE` event.
>
> h2. 2) Selector.poll(long timeout) should then be called to consume the send
> and send data to the network.
>
> {code:java}
> Selector.poll() -> this.nioSelector.select(timeout)
> Selector.pollSelectionKeys() ->
> Selector.attemptWrite() ->
> Selector.write(channel) ->
> KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code}
>
> 1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously
> registered `OP_WRITE` event
> 2. Execute the `Selector.attemptWrite()` method
> 3. In the `KafkaChannel.write()` method, after the data is successfully sent,
> update `this.send` to the completed state.
> 4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in
> the completed state, otherwise do nothing.
> 5. If send is completed, transportLayer removes the `OP_WRITE` event and
> resets the KafkaChannel.send object to null.
> 6. Wait for the next request data that is ready to be sent.
>
> It seems that there is no problem with the whole process above, but carefully
> read the method of NetworkClient.canSendRequest(...), there is such a
> condition in inFlightRequests.canSendMore(node):
> {code:java}
> queue.peekFirst().send.completed(){code}
> Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls
> *addFirst(request).*
>
> Currently, *only one send object* is stored in KafkaChannel, not a sendObject
> {*}collection{*};
> During OP_WRITE event registration and removal, *only* *one* *send* *object*
> *will be sent* in the KafkaChannel.write() method and *only one send*
> *object* *will be completed* in the KafkaChannel.maybeCompleteSend() method.
> So whether the clientRequest is eligible to be sent will {color:#ff0000}*only
> be limited by queue.peekFirst().send.completed()*{color}, the
> {color:#ff0000}*max.in.flight.requests.per.connection*{color} parameter will
> lose its effect, and {*}{color:#ff0000}the effect of setting greater than 1
> is equivalent in 1{color}{*}.
> h2.
> Suggest
> Due to the KafkaClient architecture, we do not need to consider the
> concurrent execution of multiple threads of the `NetworkClient.poll` method.
> 1.NetworkClient.canSendRequest(...) removes the condition,:
> {code:java}
> queue.peekFirst().send.completed(){code}
> `max.in.flight.requests.per.connection` parameter will work again.
> 2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in
> KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
> It may no longer be appropriate now. Because we want to send more than one
> send object data during the registration and removal of `OP_WRITE` events, it
> is recommended {*}not to repeatedly register and remove `OP_WRITE` events{*},
> but choose to {*}register the `OP_WRITE` event in the
> `transportLayer.finishConnect()` method{*}:
> {code:java}
> key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT |
> SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code}
> 3. The KafkaChannel.setSend() method does not only cache *an* incoming send
> object, but the incoming send object is stored in the *sendCollection*
> structure.
> 4. When KafkaChannel.write() is executed, copy sendCollection to
> {*}midWriteSendCollection{*}, then clear sendCollection, and finally send all
> the data in {*}midWriteSendCollection{*}.
> 5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is
> a completed state send in the midWriteSendCollection, and then remove all
> completedSends from the midWriteSendCollection, and return completedSends for
> adding into Selector.completedSends.
> 6. Selector.attemptRead(channel) method execution has preconditions:
> `{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object
> does not need to be changed.
> 7. A little extra:
> ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean
> disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
> client.poll(...)
> ->
> trySend(timer.currentTimeMs());{code}
> There is a problem with this process:
> when calling the trySend(...) method for the second time, we should
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the
> send generated each time can be consumed by the next Selector.poll() method.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)