This is an automated email from the ASF dual-hosted git repository.

bneradt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new df078fb147 NOT_WRITE_AVAIL: schedule event to resend data frames 
(#11094)
df078fb147 is described below

commit df078fb14745be946b4599fc0d64a397c761763e
Author: Brian Neradt <[email protected]>
AuthorDate: Tue Mar 5 11:27:54 2024 -0500

    NOT_WRITE_AVAIL: schedule event to resend data frames (#11094)
    
    When sending a DATA frames, we check that we are not exceeding our write
    buffer's high water mark. If so, we don't write the frame.
    Unfortunately, before this patch, no event was scheduled to come back
    and try to resend any unwritten frames. This led to dropped frames at
    times. This patch adds a scheduled event to later re-try sending the
    unsent frames.
    
    This patch also updates the grpc.test.py to send 100 messages instead of
    1 which, at least on my local system, reproduced this issue.
---
 include/proxy/http2/Http2CommonSession.h   | 10 +++--
 include/proxy/http2/Http2ConnectionState.h |  6 ++-
 include/proxy/http2/Http2Stream.h          |  4 +-
 src/proxy/http2/Http2ConnectionState.cc    | 42 ++++++++++++++++----
 src/proxy/http2/Http2Stream.cc             |  2 +-
 tests/gold_tests/h2/grpc/grpc.test.py      | 25 +++++++++---
 tests/gold_tests/h2/grpc/grpc_client.py    | 61 ++++++++++++++++++++++++------
 tests/gold_tests/h2/grpc/grpc_server.py    | 54 +++++++++++++++++++-------
 tests/gold_tests/h2/grpc/simple.proto      |  6 ++-
 9 files changed, 162 insertions(+), 48 deletions(-)

diff --git a/include/proxy/http2/Http2CommonSession.h 
b/include/proxy/http2/Http2CommonSession.h
index 9f9b7e53b3..f24ee8b094 100644
--- a/include/proxy/http2/Http2CommonSession.h
+++ b/include/proxy/http2/Http2CommonSession.h
@@ -34,15 +34,17 @@
 // HTTP2_SESSION_EVENT_INIT   Http2CommonSession *  HTTP/2 session is born
 // HTTP2_SESSION_EVENT_FINI   Http2CommonSession *  HTTP/2 session is ended
 // HTTP2_SESSION_EVENT_RECV   Http2Frame *          Received a frame
-// HTTP2_SESSION_EVENT_XMIT   Http2Frame *          Send this frame
+// HTTP2_SESSION_EVENT_PRIO   Http2Frame *          Send this priority frame
+// HTTP2_SESSION_EVENT_DATA   Http2Frame *          Send the data frames in 
the stream
 
 #define HTTP2_SESSION_EVENT_INIT          (HTTP2_SESSION_EVENTS_START + 1)
 #define HTTP2_SESSION_EVENT_FINI          (HTTP2_SESSION_EVENTS_START + 2)
 #define HTTP2_SESSION_EVENT_RECV          (HTTP2_SESSION_EVENTS_START + 3)
 #define HTTP2_SESSION_EVENT_XMIT          (HTTP2_SESSION_EVENTS_START + 4)
-#define HTTP2_SESSION_EVENT_SHUTDOWN_INIT (HTTP2_SESSION_EVENTS_START + 5)
-#define HTTP2_SESSION_EVENT_SHUTDOWN_CONT (HTTP2_SESSION_EVENTS_START + 6)
-#define HTTP2_SESSION_EVENT_REENABLE      (HTTP2_SESSION_EVENTS_START + 7)
+#define HTTP2_SESSION_EVENT_DATA          (HTTP2_SESSION_EVENTS_START + 5)
+#define HTTP2_SESSION_EVENT_SHUTDOWN_INIT (HTTP2_SESSION_EVENTS_START + 6)
+#define HTTP2_SESSION_EVENT_SHUTDOWN_CONT (HTTP2_SESSION_EVENTS_START + 7)
+#define HTTP2_SESSION_EVENT_REENABLE      (HTTP2_SESSION_EVENTS_START + 8)
 
 enum class Http2SessionCod : int {
   NOT_PROVIDED,
diff --git a/include/proxy/http2/Http2ConnectionState.h 
b/include/proxy/http2/Http2ConnectionState.h
index a455105164..83ff405649 100644
--- a/include/proxy/http2/Http2ConnectionState.h
+++ b/include/proxy/http2/Http2ConnectionState.h
@@ -155,8 +155,9 @@ public:
   Http2ErrorCode get_shutdown_reason() const;
 
   // HTTP/2 frame sender
-  void schedule_stream(Http2Stream *stream);
+  void schedule_stream_to_send_priority_frames(Http2Stream *stream);
   void send_data_frames_depends_on_priority();
+  void schedule_stream_to_send_data_frames(Http2Stream *stream);
   void send_data_frames(Http2Stream *stream);
   Http2SendDataFrameResult send_a_data_frame(Http2Stream *stream, size_t 
&payload_length);
   void send_headers_frame(Http2Stream *stream);
@@ -387,7 +388,8 @@ private:
   //     "If the END_HEADERS bit is not set, this frame MUST be followed by
   //     another CONTINUATION frame."
   Http2StreamId continued_stream_id = 0;
-  bool _scheduled                   = false;
+  bool _priority_scheduled          = false;
+  bool _data_scheduled              = false;
   bool fini_received                = false;
   bool in_destroy                   = false;
   int recursion                     = 0;
diff --git a/include/proxy/http2/Http2Stream.h 
b/include/proxy/http2/Http2Stream.h
index 72515f1fb6..677cf6d087 100644
--- a/include/proxy/http2/Http2Stream.h
+++ b/include/proxy/http2/Http2Stream.h
@@ -204,8 +204,8 @@ private:
 
   NetTimeout _timeout{};
   HTTPParser http_parser;
-  EThread *_thread = nullptr;
-  Http2StreamId _id;
+  EThread *_thread        = nullptr;
+  Http2StreamId _id       = -1;
   Http2StreamState _state = Http2StreamState::HTTP2_STREAM_STATE_IDLE;
   int64_t _http_sm_id     = -1;
 
diff --git a/src/proxy/http2/Http2ConnectionState.cc 
b/src/proxy/http2/Http2ConnectionState.cc
index 232c4db825..703586c383 100644
--- a/src/proxy/http2/Http2ConnectionState.cc
+++ b/src/proxy/http2/Http2ConnectionState.cc
@@ -22,6 +22,7 @@
  */
 
 #include "../../iocore/net/P_Net.h"
+#include "iocore/eventsystem/Lock.h"
 #include "proxy/http2/HTTP2.h"
 #include "proxy/http2/Http2ConnectionState.h"
 #include "proxy/http2/Http2ClientSession.h"
@@ -35,6 +36,7 @@
 #include "iocore/net/TLSSNISupport.h"
 
 #include "tscore/ink_assert.h"
+#include "tscore/ink_memory.h"
 #include "tsutil/PostScript.h"
 #include "tsutil/LocalBuffer.h"
 
@@ -1447,7 +1449,14 @@ Http2ConnectionState::main_event_handler(int event, void 
*edata)
     REMEMBER(event, this->recursion);
     SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
     send_data_frames_depends_on_priority();
-    _scheduled = false;
+    _priority_scheduled = false;
+  } break;
+
+  case HTTP2_SESSION_EVENT_DATA: {
+    REMEMBER(event, this->recursion);
+    SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+    this->restart_streams();
+    _data_scheduled = false;
   } break;
 
   // Initiate a graceful shutdown
@@ -2014,9 +2023,9 @@ 
Http2ConnectionState::update_initial_local_rwnd(Http2WindowSize new_size)
 }
 
 void
-Http2ConnectionState::schedule_stream(Http2Stream *stream)
+Http2ConnectionState::schedule_stream_to_send_priority_frames(Http2Stream 
*stream)
 {
-  Http2StreamDebug(session, stream->get_id(), "Scheduled");
+  Http2StreamDebug(session, stream->get_id(), "Scheduling sending priority 
frames");
 
   Http2DependencyTree::Node *node = stream->priority_node;
   ink_release_assert(node != nullptr);
@@ -2024,14 +2033,29 @@ Http2ConnectionState::schedule_stream(Http2Stream 
*stream)
   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
   dependency_tree->activate(node);
 
-  if (!_scheduled) {
-    _scheduled = true;
+  if (!_priority_scheduled) {
+    _priority_scheduled = true;
 
     SET_HANDLER(&Http2ConnectionState::main_event_handler);
     this_ethread()->schedule_imm_local((Continuation *)this, 
HTTP2_SESSION_EVENT_XMIT);
   }
 }
 
+void
+Http2ConnectionState::schedule_stream_to_send_data_frames(Http2Stream *stream)
+{
+  Http2StreamDebug(session, stream->get_id(), "Scheduling sending data 
frames");
+
+  SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+
+  if (!_data_scheduled) {
+    _data_scheduled = true;
+
+    SET_HANDLER(&Http2ConnectionState::main_event_handler);
+    this_ethread()->schedule_in((Continuation *)this, HRTIME_MSECOND, 
HTTP2_SESSION_EVENT_DATA);
+  }
+}
+
 void
 Http2ConnectionState::send_data_frames_depends_on_priority()
 {
@@ -2214,6 +2238,9 @@ Http2ConnectionState::send_data_frames(Http2Stream 
*stream)
       } else {
         ink_release_assert(!"What case is this?");
       }
+    } else if (result == Http2SendDataFrameResult::NOT_WRITE_AVAIL) {
+      // Schedule an even to wake up and try to resend the stream.
+      schedule_stream_to_send_data_frames(stream);
     }
   }
   if (!more_data && result != Http2SendDataFrameResult::DONE) {
@@ -2230,14 +2257,15 @@ Http2ConnectionState::send_headers_frame(Http2Stream 
*stream)
   int payload_length          = 0;
   uint8_t flags               = 0x00;
 
-  Http2StreamDebug(session, stream->get_id(), "Send HEADERS frame");
-
   // For outbound streams, set the ID if it has not yet already been set
   // Need to defer setting the stream ID to avoid another later created stream
   // sending out first.  This may cause the peer to issue a stream or 
connection
   // error (new stream less that the greatest we have seen so far)
   this->set_stream_id(stream);
 
+  // Keep this debug below set_stream_id so that the id is correct.
+  Http2StreamDebug(session, stream->get_id(), "Send HEADERS frame");
+
   HTTPHdr *send_hdr = stream->get_send_header();
   if (stream->expect_send_trailer()) {
     // Which is a no-op conversion
diff --git a/src/proxy/http2/Http2Stream.cc b/src/proxy/http2/Http2Stream.cc
index d958151fc4..0ee5cd36d9 100644
--- a/src/proxy/http2/Http2Stream.cc
+++ b/src/proxy/http2/Http2Stream.cc
@@ -938,7 +938,7 @@ Http2Stream::send_body(bool call_update)
 
   SCOPED_MUTEX_LOCK(lock, _proxy_ssn->mutex, this_ethread());
   if (Http2::stream_priority_enabled) {
-    connection_state.schedule_stream(this);
+    connection_state.schedule_stream_to_send_priority_frames(this);
     // signal_write_event() will be called from 
`Http2ConnectionState::send_data_frames_depends_on_priority()`
     // when write_vio is consumed
   } else {
diff --git a/tests/gold_tests/h2/grpc/grpc.test.py 
b/tests/gold_tests/h2/grpc/grpc.test.py
index b7fc962aa3..72c5249810 100644
--- a/tests/gold_tests/h2/grpc/grpc.test.py
+++ b/tests/gold_tests/h2/grpc/grpc.test.py
@@ -24,6 +24,8 @@ import sys
 class TestGrpc():
     """Test basic gRPC traffic."""
 
+    num_client_connections = 50
+
     def __init__(self, description: str):
         """Configure a TestRun for gRPC traffic.
 
@@ -64,8 +66,16 @@ class TestGrpc():
                 'proxy.config.ssl.client.verify.server.policy': 'PERMISSIVE',
                 'proxy.config.dns.nameservers': f"127.0.0.1:{dns_port}",
                 'proxy.config.dns.resolv_conf': "NULL",
-                "proxy.config.diags.debug.enabled": 1,
+
+                # Disable debug logging to avoid excessive log file size. I 
keep
+                # it here for convenience of use during manual debugging.
+                "proxy.config.diags.debug.enabled": 0,
                 "proxy.config.diags.debug.tags": "http",
+
+                # The Python gRPC module uses many WINDO_UPDATE frames of small
+                # sizes, so we have to disable the min_avg_window_update to
+                # avoid ATS generating ERRORS logs and GOAWAY frames for them.
+                "proxy.config.http2.min_avg_window_update": 0,
             })
         return self._ts
 
@@ -84,8 +94,11 @@ class TestGrpc():
         self._server.Setup.Copy(server_key)
 
         port = get_port(self._server, 'port')
-        command = (f'{sys.executable} {tr.RunDirectory}/grpc_server.py {port} '
-                   'server.pem server.key')
+        # Each connection performs two requests, so multiply the number of
+        # connections by 2 to get the expected number of transactions.
+        command = (
+            f'{sys.executable} {tr.RunDirectory}/grpc_server.py {port} '
+            f'server.pem server.key {TestGrpc.num_client_connections * 2}')
         self._server.Command = command
         self._server.ReturnCode = 0
         return self._server
@@ -100,10 +113,12 @@ class TestGrpc():
         ts_cert = os.path.join(self._ts.Variables.SSLDir, 'server.pem')
         # The cert is for example.com, so we must use that domain.
         hostname = 'example.com'
-        command = (f'{sys.executable} {tr.RunDirectory}/grpc_client.py '
-                   f'{hostname} {proxy_port} {ts_cert}')
+        command = (
+            f'{sys.executable} {tr.RunDirectory}/grpc_client.py '
+            f'{hostname} {proxy_port} {ts_cert} 
{TestGrpc.num_client_connections}')
         tr.Processes.Default.Command = command
         tr.Processes.Default.ReturnCode = 0
+        tr.TimeOut = 10
 
     def _compile_protobuf_files(self) -> None:
         """Compile the protobuf files."""
diff --git a/tests/gold_tests/h2/grpc/grpc_client.py 
b/tests/gold_tests/h2/grpc/grpc_client.py
index db5990bee6..ba8818a0c7 100644
--- a/tests/gold_tests/h2/grpc/grpc_client.py
+++ b/tests/gold_tests/h2/grpc/grpc_client.py
@@ -17,6 +17,7 @@
 #  limitations under the License.
 
 import argparse
+import asyncio
 import grpc
 import os
 import sys
@@ -24,8 +25,11 @@ import sys
 import simple_pb2
 import simple_pb2_grpc
 
+global_message_counter: int = 0
+global_num_completed_connections: int = 0
 
-def run_grpc_client(hostname: str, proxy_port: int, proxy_cert: bytes) -> int:
+
+async def run_grpc_client(hostname: str, proxy_port: int, proxy_cert: bytes) 
-> int:
     """Run the gRPC client.
 
     :param hostname: The hostname to which to connect.
@@ -33,26 +37,61 @@ def run_grpc_client(hostname: str, proxy_port: int, 
proxy_cert: bytes) -> int:
     :param proxy_cert: The public TLS certificate to verify ATS against.
     :return: The exit code.
     """
+    global global_message_counter
+    global global_num_completed_connections
     credentials = grpc.ssl_channel_credentials(root_certificates=proxy_cert)
     channel_options = (('grpc.ssl_target_name_override', hostname),)
     destination_endpoint = f'127.0.0.1:{proxy_port}'
-    channel = grpc.secure_channel(destination_endpoint, credentials, 
options=channel_options)
-    print(f'Connecting to: {destination_endpoint}')
-    stub = simple_pb2_grpc.SimpleStub(channel)
-
-    message = simple_pb2.SimpleRequest(message="Client request message")
-    response = stub.SimpleMethod(message)
-    print(f'Response received from server: {response.message}')
+    async with grpc.aio.secure_channel(destination_endpoint, credentials, 
options=channel_options) as channel:
+        print(f'Connecting to: {destination_endpoint}')
+        stub = simple_pb2_grpc.TalkerStub(channel)
+
+        print(f'Creating two messages to send for counter: 
{global_message_counter}')
+        message_1 = simple_pb2.SimpleRequest(message=f'Client request message: 
{global_message_counter}.1')
+        message_2 = simple_pb2.SimpleRequest(message=f'Client request message: 
{global_message_counter}.2')
+        my_message_count = global_message_counter
+        global_message_counter += 1
+        print(f'Sending request: {my_message_count}.1')
+        response = await stub.MakeRequest(message_1)
+        print(f'Response {my_message_count}.1 received from server: 
{response.message}')
+        print(f'Sending the second request: {my_message_count}.2')
+        message = simple_pb2.SimpleRequest(message=f'Client request message: 
{global_message_counter}.2')
+        response = await stub.MakeAnotherRequest(message_2)
+        print(f'Response {my_message_count}.2 received from server: 
{response.message}')
+        global_num_completed_connections += 1
     return 0
 
 
+async def run_grpc_clients(hostname: str, proxy_port: int, proxy_cert: bytes, 
num_connections: int) -> int:
+    """Run the gRPC client.
+
+    :param hostname: The hostname to which to connect.
+    :param proxy_port: The ATS port to which to connect.
+    :param proxy_cert: The public TLS certificate to verify ATS against.
+    :param num_connections: The number of client connections to create.
+    :return: The exit code.
+    """
+    tasks: list[asyncio.Task] = []
+    for i in range(num_connections):
+        print(f'Creating client {i}')
+        tasks.append(run_grpc_client(hostname, proxy_port, proxy_cert))
+    await asyncio.gather(*tasks)
+    if global_num_completed_connections != num_connections:
+        print(f'Expected {num_connections} responses, but got 
{global_num_completed_connections}')
+        return 1
+    else:
+        print(f'Got the expected {num_connections} responses.')
+        return 0
+
+
 def parse_args() -> argparse.Namespace:
     """Parse command line arguments."""
     parser = argparse.ArgumentParser(description=__doc__)
     parser.add_argument('hostname', help='The hostname to which to connect.')
 
-    parser.add_argument('proxy_port', type=int, help='The ATS port to which to 
connect.')
-    parser.add_argument('proxy_cert', type=argparse.FileType('rb'), help='The 
public TLS certificate to use.')
+    parser.add_argument('proxy_port', metavar='proxy-port', type=int, 
help='The ATS port to which to connect.')
+    parser.add_argument('proxy_cert', metavar='proxy-cert', 
type=argparse.FileType('rb'), help='The public TLS certificate to use.')
+    parser.add_argument('num_connections', metavar='num-connections', 
type=int, help='The number of connections to create.')
     return parser.parse_args()
 
 
@@ -64,7 +103,7 @@ def main() -> int:
     args = parse_args()
 
     try:
-        return run_grpc_client(args.hostname, args.proxy_port, 
args.proxy_cert.read())
+        return asyncio.run(run_grpc_clients(args.hostname, args.proxy_port, 
args.proxy_cert.read(), args.num_connections))
     except grpc.RpcError as e:
         print(f'RPC failed with code {e.code()}: {e.details()}')
         return 1
diff --git a/tests/gold_tests/h2/grpc/grpc_server.py 
b/tests/gold_tests/h2/grpc/grpc_server.py
index 109b303cc3..63dbd4f564 100644
--- a/tests/gold_tests/h2/grpc/grpc_server.py
+++ b/tests/gold_tests/h2/grpc/grpc_server.py
@@ -17,6 +17,7 @@
 #  limitations under the License.
 
 import argparse
+import asyncio
 from concurrent import futures
 import grpc
 import sys
@@ -25,18 +26,30 @@ import time
 import simple_pb2
 import simple_pb2_grpc
 
+global_message_counter: int = 0
 
-class SimpleServicer(simple_pb2_grpc.SimpleServicer):
+
+class Talker(simple_pb2_grpc.TalkerServicer):
     """A gRPC servicer."""
 
-    def SimpleMethod(self, request, context):
+    async def MakeRequest(self, request: simple_pb2.SimpleRequest, context: 
grpc.aio.ServicerContext):
         """An example gRPC method."""
-        print(f'Request received from client: {request.message}')
+        global global_message_counter
+        global_message_counter += 1
+        print(f'Received request: {request.message}')
         response = simple_pb2.SimpleResponse(message=f"Echo: 
{request.message}")
         return response
 
+    async def MakeAnotherRequest(self, request: simple_pb2.SimpleRequest, 
context: grpc.aio.ServicerContext):
+        """An example gRPC method."""
+        global global_message_counter
+        global_message_counter += 1
+        print(f'Received another request: {request.message}')
+        response = simple_pb2.SimpleResponse(message=f"Another echo: 
{request.message}")
+        return response
 
-def run_grpc_server(port: int, server_cert: str, server_key: str) -> int:
+
+async def run_grpc_server(port: int, server_cert: str, server_key: str) -> int:
     """Run the gRPC server.
 
     :param port: The port on which to listen.
@@ -45,17 +58,18 @@ def run_grpc_server(port: int, server_cert: str, 
server_key: str) -> int:
     :return: The exit code.
     """
     credentials = grpc.ssl_server_credentials([(server_key, server_cert)])
-    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
-    simple_pb2_grpc.add_SimpleServicer_to_server(SimpleServicer(), server)
+    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
+    simple_pb2_grpc.add_TalkerServicer_to_server(Talker(), server)
     server_endpoint = f'127.0.0.1:{port}'
     server.add_secure_port(server_endpoint, credentials)
     print(f'Listening on: {server_endpoint}')
-    server.start()
     try:
-        server.wait_for_termination()
-    except KeyboardInterrupt:
-        print("Keyboard interrupt received, exiting.")
-        return 0
+        await server.start()
+        await server.wait_for_termination()
+    except asyncio.exceptions.CancelledError:
+        print('Shutting down the server.')
+    finally:
+        await server.stop(0)
     return 0
 
 
@@ -63,8 +77,10 @@ def parse_args() -> argparse.Namespace:
     """Parse command line arguments."""
     parser = argparse.ArgumentParser(description=__doc__)
     parser.add_argument('port', type=int, help='The port on which to listen.')
-    parser.add_argument('server_crt', type=argparse.FileType('rb'), help="The 
public TLS certificate to use.")
-    parser.add_argument('server_key', type=argparse.FileType('rb'), help="The 
private TLS key to use.")
+    parser.add_argument('server_crt', metavar='server-crt', 
type=argparse.FileType('rb'), help="The public TLS certificate to use.")
+    parser.add_argument('server_key', metavar='server-key', 
type=argparse.FileType('rb'), help="The private TLS key to use.")
+    parser.add_argument(
+        'num_expected_messages', metavar='num-expected-messages', type=int, 
help="The number of expected messages from the client.")
     return parser.parse_args()
 
 
@@ -74,7 +90,17 @@ def main() -> int:
     :return: The exit code.
     """
     args = parse_args()
-    return run_grpc_server(args.port, args.server_crt.read(), 
args.server_key.read())
+    try:
+        return asyncio.run(run_grpc_server(args.port, args.server_crt.read(), 
args.server_key.read()))
+    except KeyboardInterrupt:
+        pass
+
+    if global_message_counter == args.num_expected_messages:
+        print(f'Received {args.num_expected_messages} messages as expected.')
+        return 0
+    else:
+        print(f'Expected {args.num_expected_messages} messages, but received 
{global_message_counter}.')
+        return 1
 
 
 if __name__ == '__main__':
diff --git a/tests/gold_tests/h2/grpc/simple.proto 
b/tests/gold_tests/h2/grpc/simple.proto
index d8f2aa106b..c2f058f044 100644
--- a/tests/gold_tests/h2/grpc/simple.proto
+++ b/tests/gold_tests/h2/grpc/simple.proto
@@ -25,8 +25,10 @@ syntax = "proto3";
 
 package simple;
 
-service Simple {
-    rpc SimpleMethod(SimpleRequest) returns (SimpleResponse) {}
+service Talker {
+    rpc MakeRequest(SimpleRequest) returns (SimpleResponse) {}
+    rpc MakeAnotherRequest(SimpleRequest) returns (SimpleResponse) {}
+    rpc MakeStreamedRequest(stream SimpleRequest) returns (stream 
SimpleResponse) {}
 }
 
 message SimpleRequest {

Reply via email to