https://github.com/ashgti updated 
https://github.com/llvm/llvm-project/pull/140777

>From 048b67a1ac5f54f0cbae728b67cf8b6cbbe9ce63 Mon Sep 17 00:00:00 2001
From: John Harrison <[email protected]>
Date: Wed, 15 Oct 2025 09:12:13 -0700
Subject: [PATCH] [lldb-dap] Experimenting with async in DAP tests.

This is not fully tested yet, but the basic approach is to remove the 
background thread that can cause races in the tests.

Instead, I am using an asyncio event loop to read packets, and the tests can 
await on the loop to process packets as needed.

With further refactoring we should be able to push the async operations further 
up the stack into the test cases themselves.
---
 lldb/packages/Python/lldbsuite/test/dotest.py |   3 +-
 .../test/tools/lldb-dap/dap_server.py         | 900 +++++++++---------
 .../test/tools/lldb-dap/lldbdap_testcase.py   | 124 +--
 .../TestDAP_breakpointAssembly.py             |   2 +-
 .../tools/lldb-dap/console/TestDAP_console.py |   4 +-
 lldb/test/API/tools/lldb-dap/io/TestDAP_io.py |  50 +-
 .../tools/lldb-dap/launch/TestDAP_launch.py   |  43 +-
 .../tools/lldb-dap/server/TestDAP_server.py   |  41 +-
 8 files changed, 574 insertions(+), 593 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/dotest.py 
b/lldb/packages/Python/lldbsuite/test/dotest.py
index 63f7df4de1894..8d9e3c6c16054 100644
--- a/lldb/packages/Python/lldbsuite/test/dotest.py
+++ b/lldb/packages/Python/lldbsuite/test/dotest.py
@@ -300,7 +300,8 @@ def parseOptionsAndInitTestdirs():
             configuration.libcxx_include_target_dir = 
args.libcxx_include_target_dir
             configuration.libcxx_library_dir = args.libcxx_library_dir
 
-    configuration.cmake_build_type = args.cmake_build_type.lower()
+    if args.cmake_build_type:
+        configuration.cmake_build_type = args.cmake_build_type.lower()
 
     if args.channels:
         lldbtest_config.channels = args.channels
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index 8eb64b4ab8b2b..39c17fc957357 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -1,28 +1,30 @@
 #!/usr/bin/env python
 
+import asyncio
 import binascii
+import enum
 import json
 import optparse
 import os
-import pprint
+
+# import pprint
+import signal
 import socket
 import string
 import subprocess
-import signal
 import sys
-import threading
-import time
+from warnings import warn
 from typing import (
     Any,
+    cast,
     Optional,
     Dict,
-    cast,
     List,
     Callable,
     IO,
     Union,
-    BinaryIO,
     TextIO,
+    Tuple,
     TypedDict,
     Literal,
 )
@@ -44,7 +46,7 @@ class Request(TypedDict, total=False):
     arguments: Any
 
 
-class Response(TypedDict):
+class Response(TypedDict, total=False):
     type: Literal["response"]
     seq: int
     request_seq: int
@@ -62,29 +64,27 @@ class Source(TypedDict, total=False):
     path: str
     sourceReference: int
 
-    @staticmethod
-    def build(
-        *,
-        name: Optional[str] = None,
-        path: Optional[str] = None,
-        source_reference: Optional[int] = None,
-    ) -> "Source":
-        """Builds a source from the given name, path or source_reference."""
-        if not name and not path and not source_reference:
-            raise ValueError(
-                "Source.build requires either name, path, or source_reference"
-            )
 
-        s = Source()
-        if name:
-            s["name"] = name
-        if path:
-            if not name:
-                s["name"] = os.path.basename(path)
-            s["path"] = path
-        if source_reference is not None:
-            s["sourceReference"] = source_reference
-        return s
+def source(
+    *,
+    name: Optional[str] = None,
+    path: Optional[str] = None,
+    source_reference: Optional[int] = None,
+) -> "Source":
+    """Builds a source from the given name, path or source_reference."""
+    if not name and not path and not source_reference:
+        raise ValueError("Source.build requires either name, path, or 
source_reference")
+
+    s: Source = {}
+    if name:
+        s["name"] = name
+    if path:
+        if not name:
+            s["name"] = os.path.basename(path)
+        s["path"] = path
+    if source_reference is not None:
+        s["sourceReference"] = source_reference
+    return s
 
 
 class Breakpoint(TypedDict, total=False):
@@ -92,9 +92,9 @@ class Breakpoint(TypedDict, total=False):
     verified: bool
     source: Source
 
-    @staticmethod
-    def is_verified(src: "Breakpoint") -> bool:
-        return src.get("verified", False)
+
+def is_verified(src: "Breakpoint") -> bool:
+    return src.get("verified", False)
 
 
 def dump_memory(base_addr, data, num_per_line, outfile):
@@ -167,70 +167,131 @@ def packet_type_is(packet, packet_type):
     return "type" in packet and packet["type"] == packet_type
 
 
-def dump_dap_log(log_file: Optional[str]) -> None:
-    print("========= DEBUG ADAPTER PROTOCOL LOGS =========", file=sys.stderr)
+def dump_dap_log(log_file: Optional[str], file: TextIO) -> None:
+    print("========= DEBUG ADAPTER PROTOCOL LOGS =========", file=file)
     if log_file is None:
-        print("no log file available", file=sys.stderr)
+        print("no log file available", file=file)
     else:
-        with open(log_file, "r") as file:
-            print(file.read(), file=sys.stderr)
-    print("========= END =========", file=sys.stderr)
+        with open(log_file, "r") as logs:
+            print(logs.read(), file=file)
+    print("========= END =========", file=file)
 
 
 class NotSupportedError(KeyError):
     """Raised if a feature is not supported due to its capabilities."""
 
 
+class State(enum.Enum):
+    ALLOCATED = 0
+    INITIALIZING = 1
+    INITIALIZED = 2
+    RUNNING = 3
+    DISCONNECTING = 4
+    DISCONNECTED = 5
+
+
 class DebugCommunication(object):
+    _log_file: Optional[str]
+    _loop: asyncio.AbstractEventLoop
+    _send: asyncio.StreamWriter
+    _recv: asyncio.StreamReader
+    # For debugging test failures, try setting `trace_file = sys.stderr`.
+    _trace_file: Optional[TextIO] = None
+    _sequence: int = 1
+    _response_handlers: Dict[int, asyncio.Future] = {}
+    _packets: List[ProtocolMessage] = []
+
+    # Connection state
+    state = State.ALLOCATED
+    process: Optional[asyncio.subprocess.Process] = None
+
+    # Session state
+    capabilities: Dict = {}
+    configuration_done_sent = False
+    exit_status: Optional[int] = None
+    init_commands: List[str] = []
+    output: Dict[str, str] = {}  # keyed by category
+    terminated = False
+
+    # debuggee state
+    threads: Dict[int, str] = {}  # keyed by thread id
+    thread_stop_reasons: Dict[int, Any] = {}  # keyed by thread id
+    frame_scopes: Dict[int, Any] = {}  # keyed by frame id
+    breakpoints: Dict[int, Breakpoint] = {}  # keyed by breakpoint id
+
+    @property
+    def is_initialized(self) -> bool:
+        """Returns true if the debugger is initialized."""
+        return self.state in (
+            State.INITIALIZED,
+            State.RUNNING,
+            State.DISCONNECTING,
+            State.DISCONNECTED,
+        )
+
+    @property
+    def is_stopped(self) -> bool:
+        """Returns true if the debuggee is in a stopped state, including if it 
has exited."""
+        return self.is_exited or len(self.thread_stop_reasons) > 0
+
+    @property
+    def is_exited(self) -> bool:
+        """Returns true if the debuggee process has exited."""
+        return self.exit_status is not None
+
+    @property
+    def events(self) -> List[Event]:
+        return [p for p in self._packets if p["type"] == "event"]
+
+    @property
+    def reverse_requests(self) -> List[Request]:
+        return [p for p in self._packets if p["type"] == "request"]
+
+    @property
+    def module_events(self) -> List[Dict]:
+        return [e["body"] for e in self.events if e["event"] == "module"]
+
+    @property
+    def progress_events(self) -> List[Event]:
+        return [e for e in self.events if e["event"].startswith("progress")]
+
+    @property
+    def memory_events(self) -> List[Event]:
+        return [e for e in self.events if e["event"] == "memory"]
+
+    @property
+    def process_event(self) -> Optional[Event]:
+        for e in self.events:
+            if e["event"] == "process":
+                return e
+        return None
+
+    @property
+    def invalidated_events(self) -> List[Event]:
+        return [e for e in self.events if e["event"] == "invalidated"]
+
     def __init__(
         self,
-        recv: BinaryIO,
-        send: BinaryIO,
-        init_commands: list[str],
-        log_file: Optional[TextIO] = None,
+        loop: asyncio.AbstractEventLoop,
+        recv: asyncio.StreamReader,
+        send: asyncio.StreamWriter,
+        init_commands: List[str] = [],
+        log_file: Optional[str] = None,
     ):
-        # For debugging test failures, try setting `trace_file = sys.stderr`.
-        self.trace_file: Optional[TextIO] = None
-        self.log_file = log_file
-        self.send = send
-        self.recv = recv
-
-        # Packets that have been received and processed but have not yet been
-        # requested by a test case.
-        self._pending_packets: List[Optional[ProtocolMessage]] = []
-        # Received packets that have not yet been processed.
-        self._recv_packets: List[Optional[ProtocolMessage]] = []
-        # Used as a mutex for _recv_packets and for notify when _recv_packets
-        # changes.
-        self._recv_condition = threading.Condition()
-        self._recv_thread = threading.Thread(target=self._read_packet_thread)
-
-        # session state
-        self.init_commands = init_commands
-        self.exit_status: Optional[int] = None
-        self.capabilities: Dict = {}
-        self.initialized: bool = False
-        self.configuration_done_sent: bool = False
-        self.process_event_body: Optional[Dict] = None
-        self.terminated: bool = False
-        self.events: List[Event] = []
-        self.progress_events: List[Event] = []
-        self.invalidated_event: Optional[Event] = None
-        self.memory_event: Optional[Event] = None
-        self.reverse_requests: List[Request] = []
-        self.module_events: List[Dict] = []
-        self.sequence: int = 1
-        self.output: Dict[str, str] = {}
-
-        # debuggee state
-        self.threads: Optional[dict] = None
-        self.thread_stop_reasons: Dict[str, Any] = {}
-        self.frame_scopes: Dict[str, Any] = {}
-        # keyed by breakpoint id
-        self.resolved_breakpoints: dict[str, Breakpoint] = {}
-
-        # trigger enqueue thread
-        self._recv_thread.start()
+        self._log_file = log_file
+        self._loop = loop
+        self._send = send
+        self._recv = recv
+        self.init_commands = init_commands[:]
+        self._sequence: int = 1
+        self._response_handlers = {}
+        self._packets = []
+        self.capabilities = {}
+        self.output = {}
+        self.threads = {}
+        self.thread_stop_reasons = {}
+        self.frame_scopes = {}
+        self.breakpoints = {}
 
     @classmethod
     def encode_content(cls, s: str) -> bytes:
@@ -247,17 +308,33 @@ def validate_response(cls, command, response):
                 f"seq mismatch in response {command['seq']} != 
{response['request_seq']}"
             )
 
-    def _read_packet_thread(self):
+    async def _read(self) -> ProtocolMessage:
         try:
-            while True:
-                packet = read_packet(self.recv, trace_file=self.trace_file)
-                # `packet` will be `None` on EOF. We want to pass it down to
-                # handle_recv_packet anyway so the main thread can handle 
unexpected
-                # termination of lldb-dap and stop waiting for new packets.
-                if not self._handle_recv_packet(packet):
+            content_length = 0
+            headers = await self._recv.readuntil(b"\r\n\r\n")
+            for raw_header in headers.decode().split("\r\n"):
+                k, v = raw_header.split(":", 1)
+                if k.lower() == "content-length":
+                    content_length = int(v.strip())
                     break
-        finally:
-            dump_dap_log(self.log_file)
+            if content_length == 0:
+                raise Exception("malformed DAP content header, no 
Content-Length")
+            data = await self._recv.readexactly(content_length)
+            return json.loads(data.decode())
+        except asyncio.IncompleteReadError:  # EOF or connection error
+            self._send.close()
+            self._recv.feed_eof()
+            raise EOFError()
+
+    async def _process_packet(self) -> None:
+        if self.state == State.DISCONNECTED:
+            raise ConnectionResetError()  # no longer connected
+        packet = await self._read()
+        await self._handle_packet(packet)
+
+    def dump_log(self, file=sys.stderr):
+        if self._log_file:
+            dump_dap_log(self._log_file, file)
 
     def get_modules(
         self, start_module: Optional[int] = None, module_count: Optional[int] 
= None
@@ -272,18 +349,16 @@ def get_modules(
         return modules
 
     def get_output(self, category: str, clear=True) -> str:
-        output = ""
-        if category in self.output:
-            output = self.output.get(category, "")
-            if clear:
-                del self.output[category]
-        return output
+        if clear:
+            return self.output.pop(category, "")
+        else:
+            return self.output.get(category, "")
 
     def collect_output(
         self,
         category: str,
         timeout: float,
-        pattern: Optional[str] = None,
+        pattern: str,
         clear=True,
     ) -> str:
         """Collect output from 'output' events.
@@ -296,101 +371,72 @@ def collect_output(
         Returns:
             The collected output.
         """
-        deadline = time.monotonic() + timeout
-        output = self.get_output(category, clear)
-        while deadline >= time.monotonic() and (
-            pattern is None or pattern not in output
-        ):
-            event = self.wait_for_event(["output"], timeout=deadline - 
time.monotonic())
-            if not event:  # Timeout or EOF
-                break
-            output += self.get_output(category, clear=clear)
-        return output
-
-    def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
-        with self.recv_condition:
-            self.recv_packets.append(packet)
-            self.recv_condition.notify()
-
-    def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
-        """Handles an incoming packet.
-
-        Called by the read thread that is waiting for all incoming packets
-        to store the incoming packet in "self._recv_packets" in a thread safe
-        way. This function will then signal the "self._recv_condition" to
-        indicate a new packet is available.
 
-        Args:
-            packet: A new packet to store.
+        def check_output():
+            output = self.get_output(category, clear=False)
+            return pattern is not None and pattern in output
 
-        Returns:
-            True if the caller should keep calling this function for more
-            packets.
-        """
-        with self._recv_condition:
-            self._recv_packets.append(packet)
-            self._recv_condition.notify()
-            # packet is None on EOF
-            return packet is not None and not (
-                packet["type"] == "response" and packet["command"] == 
"disconnect"
-            )
+        self._run_until(predicate=check_output, timeout=timeout)
+        return self.get_output(category, clear)
 
-    def _recv_packet(
+    def _run_until(
         self,
         *,
-        predicate: Optional[Callable[[ProtocolMessage], bool]] = None,
-        timeout: Optional[float] = None,
-    ) -> Optional[ProtocolMessage]:
-        """Processes received packets from the adapter.
+        predicate: Callable[[], bool],
+        timeout: Optional[float] = 10.0,
+    ) -> None:
+        """Run the event loop until the given predicate is true.
+
         Updates the DebugCommunication stateful properties based on the 
received
         packets in the order they are received.
-        NOTE: The only time the session state properties should be updated is
-        during this call to ensure consistency during tests.
+
         Args:
             predicate:
-                Optional, if specified, returns the first packet that matches
-                the given predicate.
+                returns once the given predicate is true.
             timeout:
                 Optional, if specified, processes packets until either the
                 timeout occurs or the predicate matches a packet, whichever
                 occurs first.
-        Returns:
-            The first matching packet for the given predicate, if specified,
-            otherwise None.
         """
-        assert (
-            threading.current_thread != self._recv_thread
-        ), "Must not be called from the _recv_thread"
-
-        def process_until_match():
-            self._process_recv_packets()
-            for i, packet in enumerate(self._pending_packets):
-                if packet is None:
-                    # We need to return a truthy value to break out of the
-                    # wait_for, use `EOFError` as an indicator of EOF.
-                    return EOFError()
-                if predicate and predicate(packet):
-                    self._pending_packets.pop(i)
-                    return packet
-
-        with self._recv_condition:
-            packet = self._recv_condition.wait_for(process_until_match, 
timeout)
-            return None if isinstance(packet, EOFError) else packet
-
-    def _process_recv_packets(self) -> None:
+
+        async def fn():
+            while not predicate():
+                await asyncio.shield(self._process_packet())
+
+        try:
+            self._loop.run_until_complete(asyncio.wait_for(fn(), timeout))
+        except asyncio.exceptions.TimeoutError:
+            warn(
+                "timeout occurred waiting on predicate, predicate may need to 
be inverted",
+                stacklevel=2,
+            )
+
+    async def _handle_packet(self, packet: ProtocolMessage) -> None:
         """Process received packets, updating the session state."""
-        with self._recv_condition:
-            for packet in self._recv_packets:
-                # Handle events that may modify any stateful properties of
-                # the DAP session.
-                if packet and packet["type"] == "event":
-                    self._handle_event(packet)
-                elif packet and packet["type"] == "request":
-                    # Handle reverse requests and keep processing.
-                    self._handle_reverse_request(packet)
-                # Move the packet to the pending queue.
-                self._pending_packets.append(packet)
-            self._recv_packets.clear()
+        self._packets.append(packet)
+        if packet and packet["type"] == "event":
+            self._handle_event(packet)
+        elif packet and packet["type"] == "request":
+            # Handle reverse requests and keep processing.
+            self._handle_reverse_request(packet)
+        elif packet and packet["type"] == "response":
+            if packet["command"] == "disconnect":
+                self.state = State.DISCONNECTED
+                if self.process and self.process.returncode is None:
+                    try:
+                        await asyncio.wait_for(self.process.wait(), 
timeout=10.0)
+                    except asyncio.exceptions.TimeoutError:
+                        self.process.terminate()
+                        await self.process.wait()
+                    if self.process.returncode != 0:
+                        raise DebugAdapterProcessError(self.process.returncode)
+            elif packet["command"] == "configurationDone":
+                self.state = State.RUNNING
+
+            if packet["request_seq"] not in self._response_handlers:
+                raise RuntimeError("unexpected response: %r" % (packet,))
+
+            self._response_handlers[packet["request_seq"]].set_result(packet)
 
     def _handle_event(self, packet: Event) -> None:
         """Handle any events that modify debug session state we track."""
@@ -406,14 +452,12 @@ def _handle_event(self, packet: Event) -> None:
             else:
                 self.output[category] = output
         elif event == "initialized":
-            self.initialized = True
-        elif event == "process":
-            # When a new process is attached or launched, remember the
-            # details that are available in the body of the event
-            self.process_event_body = body
+            self.state = State.INITIALIZED
+        elif event == "terminated":
+            # The debugger has terminated.
+            self.terminated = True
         elif event == "exited" and body:
-            # Process exited, mark the status to indicate the process is not
-            # alive.
+            # The debuggee has exited, store the exit code.
             self.exit_status = body["exitCode"]
         elif event == "continued" and body:
             # When the process continues, clear the known threads and
@@ -431,26 +475,22 @@ def _handle_event(self, packet: Event) -> None:
             self._process_stopped()
             tid = body["threadId"]
             self.thread_stop_reasons[tid] = body
-        elif event.startswith("progress"):
-            # Progress events come in as 'progressStart', 'progressUpdate',
-            # and 'progressEnd' events. Keep these around in case test
-            # cases want to verify them.
-            self.progress_events.append(packet)
         elif event == "breakpoint" and body:
             # Breakpoint events are sent when a breakpoint is resolved
             self._update_verified_breakpoints([body["breakpoint"]])
         elif event == "capabilities" and body:
             # Update the capabilities with new ones from the event.
             self.capabilities.update(body["capabilities"])
-        elif event == "invalidated":
-            self.invalidated_event = packet
-        elif event == "memory":
-            self.memory_event = packet
 
     def _handle_reverse_request(self, request: Request) -> None:
-        if request in self.reverse_requests:
-            return
-        self.reverse_requests.append(request)
+        response: Response = {
+            "type": "response",
+            "seq": 0,
+            "request_seq": request["seq"],
+            "success": True,
+            "command": request["command"],
+            "body": None,
+        }
         arguments = request.get("arguments")
         if request["command"] == "runInTerminal" and arguments is not None:
             in_shell = arguments.get("argsCanBeInterpretedByShell", False)
@@ -469,28 +509,11 @@ def _handle_reverse_request(self, request: Request) -> 
None:
                 body["shellProcessId"] = proc.pid
             else:
                 body["processId"] = proc.pid
-            self.send_packet(
-                {
-                    "type": "response",
-                    "seq": 0,
-                    "request_seq": request["seq"],
-                    "success": True,
-                    "command": "runInTerminal",
-                    "body": body,
-                }
-            )
+            response["body"] = body
+            self.send_packet(response)
         elif request["command"] == "startDebugging":
-            self.send_packet(
-                {
-                    "type": "response",
-                    "seq": 0,
-                    "request_seq": request["seq"],
-                    "success": True,
-                    "message": None,
-                    "command": "startDebugging",
-                    "body": {},
-                }
-            )
+            response["body"] = {}
+            self.send_packet(response)
         else:
             desc = 'unknown reverse request "%s"' % (request["command"])
             raise ValueError(desc)
@@ -500,131 +523,89 @@ def _process_continued(self, all_threads_continued: 
bool):
         if all_threads_continued:
             self.thread_stop_reasons = {}
 
-    def _update_verified_breakpoints(self, breakpoints: list[Breakpoint]):
+    def _update_verified_breakpoints(self, breakpoints: List[Breakpoint]):
         for bp in breakpoints:
             # If no id is set, we cannot correlate the given breakpoint across
             # requests, ignore it.
             if "id" not in bp:
                 continue
 
-            self.resolved_breakpoints[str(bp["id"])] = bp
+            self.breakpoints[bp["id"]] = bp
 
-    def send_packet(self, packet: ProtocolMessage) -> int:
+    def send_packet(
+        self, packet: ProtocolMessage
+    ) -> Optional[asyncio.Future[Response]]:
         """Takes a dictionary representation of a DAP request and send the 
request to the debug adapter.
 
         Returns the seq number of the request.
         """
+        fut = None
         # Set the seq for requests.
         if packet["type"] == "request":
-            packet["seq"] = self.sequence
-            self.sequence += 1
+            if packet.get("seq", 0) == 0:
+                packet["seq"] = self._sequence
+                self._sequence += 1
+            fut = self._loop.create_future()
+            self._response_handlers[packet["seq"]] = fut
         else:
             packet["seq"] = 0
 
         # Encode our command dictionary as a JSON string
         json_str = json.dumps(packet, separators=(",", ":"))
 
-        if self.trace_file:
-            self.trace_file.write("to adapter:\n%s\n" % (json_str))
+        if self._trace_file:
+            self._trace_file.write("to adapter:\n%s\n" % (json_str))
 
         length = len(json_str)
         if length > 0:
             # Send the encoded JSON packet and flush the 'send' file
-            self.send.write(self.encode_content(json_str))
-            self.send.flush()
+            self._send.write(self.encode_content(json_str))
+        return fut
 
-        return packet["seq"]
-
-    def _send_recv(self, request: Request) -> Optional[Response]:
+    def _send_recv(self, request: Request) -> Response:
         """Send a command python dictionary as JSON and receive the JSON
         response. Validates that the response is the correct sequence and
         command in the reply. Any events that are received are added to the
         events list in this object"""
-        seq = self.send_packet(request)
-        response = self.receive_response(seq)
+        fut = self.send_packet(request)
+        if fut is None:
+            raise ValueError(f"failed to send {request!r}")
+        self._run_until(predicate=fut.done)
+        response = fut.result()
         if response is None:
             raise ValueError(f"no response for {request!r}")
         self.validate_response(request, response)
         return response
 
-    def receive_response(self, seq: int) -> Optional[Response]:
-        """Waits for a response with the associated request_sec."""
-
-        def predicate(p: ProtocolMessage):
-            return p["type"] == "response" and p["request_seq"] == seq
-
-        return cast(Optional[Response], self._recv_packet(predicate=predicate))
+    def wait_for_stopped(self, timeout: Optional[float] = None) -> None:
+        self._run_until(predicate=lambda: self.is_stopped, timeout=timeout)
 
-    def wait_for_event(
-        self, filter: List[str] = [], timeout: Optional[float] = None
-    ) -> Optional[Event]:
-        """Wait for the first event that matches the filter."""
-
-        def predicate(p: ProtocolMessage):
-            return p["type"] == "event" and p["event"] in filter
-
-        return cast(
-            Optional[Event], self._recv_packet(predicate=predicate, 
timeout=timeout)
-        )
+    def wait_for_breakpoints_to_be_verified(
+        self, breakpoint_ids: List[int], timeout: Optional[float] = None
+    ) -> List[int]:
+        """Wait for all breakpoints to be verified. Return all unverified 
breakpoints."""
+        assert len(breakpoint_ids) > 0, "must wait for at least one breakpoint"
 
-    def wait_for_stopped(
-        self, timeout: Optional[float] = None
-    ) -> Optional[List[Event]]:
-        stopped_events = []
-        stopped_event = self.wait_for_event(
-            filter=["stopped", "exited"], timeout=timeout
-        )
-        while stopped_event:
-            stopped_events.append(stopped_event)
-            # If we exited, then we are done
-            if stopped_event["event"] == "exited":
-                break
-            # Otherwise we stopped and there might be one or more 'stopped'
-            # events for each thread that stopped with a reason, so keep
-            # checking for more 'stopped' events and return all of them
-            stopped_event = self.wait_for_event(
-                filter=["stopped", "exited"], timeout=0.25
+        def predicate() -> bool:
+            return all(
+                id in self.breakpoints and is_verified(self.breakpoints[id])
+                for id in breakpoint_ids
             )
-        return stopped_events
 
-    def wait_for_breakpoint_events(self, timeout: Optional[float] = None):
-        breakpoint_events: list[Event] = []
-        while True:
-            event = self.wait_for_event(["breakpoint"], timeout=timeout)
-            if not event:
-                break
-            breakpoint_events.append(event)
-        return breakpoint_events
-
-    def wait_for_breakpoints_to_be_verified(
-        self, breakpoint_ids: list[str], timeout: Optional[float] = None
-    ):
-        """Wait for all breakpoints to be verified. Return all unverified 
breakpoints."""
-        while any(id not in self.resolved_breakpoints for id in 
breakpoint_ids):
-            breakpoint_event = self.wait_for_event(["breakpoint"], 
timeout=timeout)
-            if breakpoint_event is None:
-                break
+        self._run_until(predicate=predicate, timeout=timeout)
 
         return [
             id
             for id in breakpoint_ids
-            if (
-                id not in self.resolved_breakpoints
-                or not Breakpoint.is_verified(self.resolved_breakpoints[id])
-            )
+            if (id not in self.breakpoints or not 
is_verified(self.breakpoints[id]))
         ]
 
-    def wait_for_exited(self, timeout: Optional[float] = None):
-        event_dict = self.wait_for_event(["exited"], timeout=timeout)
-        if event_dict is None:
-            raise ValueError("didn't get exited event")
-        return event_dict
+    def wait_for_exited(self, timeout: Optional[float] = None) -> int:
+        self._run_until(predicate=lambda: self.is_exited, timeout=timeout)
+        return cast(int, self.exit_status)
 
-    def wait_for_terminated(self, timeout: Optional[float] = None):
-        event_dict = self.wait_for_event(["terminated"], timeout)
-        if event_dict is None:
-            raise ValueError("didn't get terminated event")
-        return event_dict
+    def wait_for_terminated(self, timeout: Optional[float] = None) -> None:
+        self._run_until(predicate=lambda: self.terminated, timeout=timeout)
 
     def get_capability(self, key: str):
         """Get a value for the given key if it there is a key/value pair in
@@ -659,7 +640,7 @@ def get_stackFrame(self, frameIndex=0, threadId=None):
             print("invalid threadId")
             return None
         response = self.request_stackTrace(threadId, startFrame=frameIndex, 
levels=1)
-        if response:
+        if response and response["body"]["stackFrames"]:
             return response["body"]["stackFrames"][0]
         print("invalid response")
         return None
@@ -740,66 +721,26 @@ def get_local_variable_child(
                 return child
         return None
 
-    def replay_packets(self, replay_file_path):
-        f = open(replay_file_path, "r")
-        mode = "invalid"
-        set_sequence = False
-        command_dict = None
-        while mode != "eof":
-            if mode == "invalid":
-                line = f.readline()
-                if line.startswith("to adapter:"):
-                    mode = "send"
-                elif line.startswith("from adapter:"):
-                    mode = "recv"
-            elif mode == "send":
-                command_dict = read_packet(f)
-                # Skip the end of line that follows the JSON
-                f.readline()
-                if command_dict is None:
-                    raise ValueError("decode packet failed from replay file")
-                print("Sending:")
-                pprint.PrettyPrinter(indent=2).pprint(command_dict)
-                # raw_input('Press ENTER to send:')
-                self.send_packet(command_dict, set_sequence)
-                mode = "invalid"
-            elif mode == "recv":
-                print("Replay response:")
-                replay_response = read_packet(f)
-                # Skip the end of line that follows the JSON
-                f.readline()
-                pprint.PrettyPrinter(indent=2).pprint(replay_response)
-                actual_response = self.recv_packet()
-                if actual_response:
-                    type = actual_response["type"]
-                    print("Actual response:")
-                    if type == "response":
-                        self.validate_response(command_dict, actual_response)
-                    pprint.PrettyPrinter(indent=2).pprint(actual_response)
-                else:
-                    print("error: didn't get a valid response")
-                mode = "invalid"
-
     def request_attach(
         self,
         *,
         program: Optional[str] = None,
         pid: Optional[int] = None,
         waitFor=False,
-        initCommands: Optional[list[str]] = None,
-        preRunCommands: Optional[list[str]] = None,
-        attachCommands: Optional[list[str]] = None,
-        postRunCommands: Optional[list[str]] = None,
-        stopCommands: Optional[list[str]] = None,
-        exitCommands: Optional[list[str]] = None,
-        terminateCommands: Optional[list[str]] = None,
+        initCommands: Optional[List[str]] = None,
+        preRunCommands: Optional[List[str]] = None,
+        attachCommands: Optional[List[str]] = None,
+        postRunCommands: Optional[List[str]] = None,
+        stopCommands: Optional[List[str]] = None,
+        exitCommands: Optional[List[str]] = None,
+        terminateCommands: Optional[List[str]] = None,
         coreFile: Optional[str] = None,
         stopOnEntry=False,
-        sourceMap: Optional[Union[list[tuple[str, str]], dict[str, str]]] = 
None,
+        sourceMap: Optional[Union[List[tuple[str, str]], Dict[str, str]]] = 
None,
         gdbRemotePort: Optional[int] = None,
         gdbRemoteHostname: Optional[str] = None,
     ):
-        args_dict = {}
+        args_dict: Dict[str, Any] = {}
         if pid is not None:
             args_dict["pid"] = pid
         if program is not None:
@@ -873,7 +814,7 @@ def _process_stopped(self):
         self.frame_scopes = {}
 
     def request_continue(self, threadId=None, singleThread=False):
-        if self.exit_status is not None:
+        if self.is_exited:
             raise ValueError("request_continue called after process exited")
         # If we have launched or attached, then the first continue is done by
         # sending the 'configurationDone' request
@@ -913,6 +854,10 @@ def request_restart(self, restartArguments=None):
         return response
 
     def request_disconnect(self, terminateDebuggee=None):
+        if self.state == State.DISCONNECTED:
+            # FIXME: should this raise if we are already disconnected?
+            return None
+        self.state = State.DISCONNECTING
         args_dict = {}
         if terminateDebuggee is not None:
             if terminateDebuggee:
@@ -1025,6 +970,7 @@ def request_initialize(self, sourceInitFile=False):
                 "$__lldb_sourceInitFile": sourceInitFile,
             },
         }
+        self.state = State.INITIALIZING
         response = self._send_recv(command_dict)
         if response:
             if "body" in response:
@@ -1035,33 +981,33 @@ def request_launch(
         self,
         program: str,
         *,
-        args: Optional[list[str]] = None,
+        args: Optional[List[str]] = None,
         cwd: Optional[str] = None,
-        env: Optional[dict[str, str]] = None,
+        env: Optional[Dict[str, str]] = None,
         stopOnEntry=False,
         disableASLR=False,
         disableSTDIO=False,
         shellExpandArguments=False,
         console: Optional[str] = None,
-        stdio: Optional[list[str]] = None,
+        stdio: Optional[List[str]] = None,
         enableAutoVariableSummaries=False,
         displayExtendedBacktrace=False,
         enableSyntheticChildDebugging=False,
-        initCommands: Optional[list[str]] = None,
-        preRunCommands: Optional[list[str]] = None,
-        launchCommands: Optional[list[str]] = None,
-        postRunCommands: Optional[list[str]] = None,
-        stopCommands: Optional[list[str]] = None,
-        exitCommands: Optional[list[str]] = None,
-        terminateCommands: Optional[list[str]] = None,
-        sourceMap: Optional[Union[list[tuple[str, str]], dict[str, str]]] = 
None,
+        initCommands: Optional[List[str]] = None,
+        preRunCommands: Optional[List[str]] = None,
+        launchCommands: Optional[List[str]] = None,
+        postRunCommands: Optional[List[str]] = None,
+        stopCommands: Optional[List[str]] = None,
+        exitCommands: Optional[List[str]] = None,
+        terminateCommands: Optional[List[str]] = None,
+        sourceMap: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = 
None,
         sourcePath: Optional[str] = None,
         debuggerRoot: Optional[str] = None,
         commandEscapePrefix: Optional[str] = None,
         customFrameFormat: Optional[str] = None,
         customThreadFormat: Optional[str] = None,
     ):
-        args_dict = {"program": program}
+        args_dict: Dict[str, Any] = {"program": program}
         if args:
             args_dict["args"] = args
         if cwd:
@@ -1110,7 +1056,11 @@ def request_launch(
         args_dict["displayExtendedBacktrace"] = displayExtendedBacktrace
         if commandEscapePrefix is not None:
             args_dict["commandEscapePrefix"] = commandEscapePrefix
-        command_dict = {"command": "launch", "type": "request", "arguments": 
args_dict}
+        command_dict: Request = {
+            "command": "launch",
+            "type": "request",
+            "arguments": args_dict,
+        }
         return self._send_recv(command_dict)
 
     def request_next(self, threadId, granularity="statement"):
@@ -1195,7 +1145,7 @@ def request_setBreakpoints(self, source: Source, 
line_array, data=None):
                 breakpoints.append(bp)
             args_dict["breakpoints"] = breakpoints
 
-        command_dict = {
+        command_dict: Request = {
             "command": "setBreakpoints",
             "type": "request",
             "arguments": args_dict,
@@ -1206,12 +1156,12 @@ def request_setBreakpoints(self, source: Source, 
line_array, data=None):
         return response
 
     def request_setExceptionBreakpoints(
-        self, *, filters: list[str] = [], filter_options: list[dict] = []
+        self, *, filters: List[str] = [], filter_options: List[dict] = []
     ):
         args_dict = {"filters": filters}
         if filter_options:
             args_dict["filterOptions"] = filter_options
-        command_dict = {
+        command_dict: Request = {
             "command": "setExceptionBreakpoints",
             "type": "request",
             "arguments": args_dict,
@@ -1228,7 +1178,7 @@ def request_setFunctionBreakpoints(self, names, 
condition=None, hitCondition=Non
                 bp["hitCondition"] = hitCondition
             breakpoints.append(bp)
         args_dict = {"breakpoints": breakpoints}
-        command_dict = {
+        command_dict: Request = {
             "command": "setFunctionBreakpoints",
             "type": "request",
             "arguments": args_dict,
@@ -1249,7 +1199,7 @@ def request_dataBreakpointInfo(
             "name": name,
             "frameId": stackFrame["id"],
         }
-        command_dict = {
+        command_dict: Request = {
             "command": "dataBreakpointInfo",
             "type": "request",
             "arguments": args_dict,
@@ -1266,7 +1216,7 @@ def request_setDataBreakpoint(self, dataBreakpoints):
         }
         """
         args_dict = {"breakpoints": dataBreakpoints}
-        command_dict = {
+        command_dict: Request = {
             "command": "setDataBreakpoints",
             "type": "request",
             "arguments": args_dict,
@@ -1275,7 +1225,7 @@ def request_setDataBreakpoint(self, dataBreakpoints):
 
     def request_compileUnits(self, moduleId):
         args_dict = {"moduleId": moduleId}
-        command_dict = {
+        command_dict: Request = {
             "command": "compileUnits",
             "type": "request",
             "arguments": args_dict,
@@ -1287,7 +1237,7 @@ def request_completions(self, text, frameId=None):
         args_dict = {"text": text, "column": len(text) + 1}
         if frameId:
             args_dict["frameId"] = frameId
-        command_dict = {
+        command_dict: Request = {
             "command": "completions",
             "type": "request",
             "arguments": args_dict,
@@ -1317,7 +1267,7 @@ def request_moduleSymbols(
         startIndex: int = 0,
         count: int = 0,
     ):
-        command_dict = {
+        command_dict: Request = {
             "command": "__lldb_moduleSymbols",
             "type": "request",
             "arguments": {
@@ -1341,7 +1291,7 @@ def request_stackTrace(
             args_dict["levels"] = levels
         if format is not None:
             args_dict["format"] = format
-        command_dict = {
+        command_dict: Request = {
             "command": "stackTrace",
             "type": "request",
             "arguments": args_dict,
@@ -1375,7 +1325,7 @@ def request_source(
             raise ValueError(
                 "request_source requires either source or sourceReference not 
both"
             )
-        command_dict = {
+        command_dict: Request = {
             "command": "source",
             "type": "request",
             "arguments": {
@@ -1391,7 +1341,11 @@ def request_threads(self):
         "stopped" events since those contain more information about why a
         thread actually stopped. Returns an array of thread dictionaries
         with information about all threads"""
-        command_dict = {"command": "threads", "type": "request", "arguments": 
{}}
+        command_dict: Request = {
+            "command": "threads",
+            "type": "request",
+            "arguments": {},
+        }
         response = self._send_recv(command_dict)
         if not response["success"]:
             self.threads = None
@@ -1400,21 +1354,7 @@ def request_threads(self):
         # Fill in "self.threads" correctly so that clients that call
         # self.get_threads() or self.get_thread_id(...) can get information
         # on threads when the process is stopped.
-        if "threads" in body:
-            self.threads = body["threads"]
-            for thread in self.threads:
-                # Copy the thread dictionary so we can add key/value pairs to
-                # it without affecting the original info from the "threads"
-                # command.
-                tid = thread["id"]
-                if tid in self.thread_stop_reasons:
-                    thread_stop_info = self.thread_stop_reasons[tid]
-                    copy_keys = ["reason", "description", "text"]
-                    for key in copy_keys:
-                        if key in thread_stop_info:
-                            thread[key] = thread_stop_info[key]
-        else:
-            self.threads = None
+        self.threads = body.get("threads", None)
         return response
 
     def request_variables(
@@ -1427,7 +1367,7 @@ def request_variables(
             args_dict["count"] = count
         if is_hex is not None:
             args_dict["format"] = {"hex": is_hex}
-        command_dict = {
+        command_dict: Request = {
             "command": "variables",
             "type": "request",
             "arguments": args_dict,
@@ -1442,7 +1382,7 @@ def request_setVariable(self, containingVarRef, name, 
value, id=None):
         }
         if id is not None:
             args_dict["id"] = id
-        command_dict = {
+        command_dict: Request = {
             "command": "setVariable",
             "type": "request",
             "arguments": args_dict,
@@ -1453,7 +1393,7 @@ def request_locations(self, locationReference):
         args_dict = {
             "locationReference": locationReference,
         }
-        command_dict = {
+        command_dict: Request = {
             "command": "locations",
             "type": "request",
             "arguments": args_dict,
@@ -1465,7 +1405,7 @@ def request_testGetTargetBreakpoints(self):
         set breakpoint infos for all breakpoints currently set in the
         target.
         """
-        command_dict = {
+        command_dict: Request = {
             "command": "_testGetTargetBreakpoints",
             "type": "request",
             "arguments": {},
@@ -1473,9 +1413,7 @@ def request_testGetTargetBreakpoints(self):
         return self._send_recv(command_dict)
 
     def terminate(self):
-        self.send.close()
-        if self._recv_thread.is_alive():
-            self._recv_thread.join()
+        self._send.close()
 
     def request_setInstructionBreakpoints(self, memory_reference=[]):
         breakpoints = []
@@ -1485,7 +1423,7 @@ def request_setInstructionBreakpoints(self, 
memory_reference=[]):
             }
             breakpoints.append(args_dict)
         args_dict = {"breakpoints": breakpoints}
-        command_dict = {
+        command_dict: Request = {
             "command": "setInstructionBreakpoints",
             "type": "request",
             "arguments": args_dict,
@@ -1494,69 +1432,96 @@ def request_setInstructionBreakpoints(self, 
memory_reference=[]):
 
 
 class DebugAdapterServer(DebugCommunication):
+    process: Optional[asyncio.subprocess.Process]
+    connection: Optional[str]
+
     def __init__(
         self,
-        executable: Optional[str] = None,
+        loop: asyncio.AbstractEventLoop,
+        recv: asyncio.StreamReader,
+        send: asyncio.StreamWriter,
+        init_commands: List[str] = [],
+        log_file: Optional[str] = None,
+        process: Optional[asyncio.subprocess.Process] = None,
         connection: Optional[str] = None,
-        init_commands: list[str] = [],
-        log_file: Optional[TextIO] = None,
-        env: Optional[dict[str, str]] = None,
-        additional_args: list[str] = [],
     ):
-        self.process = None
-        self.connection = None
-        if executable is not None:
-            process, connection = DebugAdapterServer.launch(
-                executable=executable,
-                connection=connection,
-                env=env,
-                log_file=log_file,
-                additional_args=additional_args,
-            )
-            self.process = process
-            self.connection = connection
+        super().__init__(loop, recv, send, init_commands, log_file)
+        self.process = process
+        self.connection = connection
 
-        if connection is not None:
-            scheme, address = connection.split("://")
-            if scheme == "unix-connect":  # unix-connect:///path
-                s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-                s.connect(address)
-            elif scheme == "connection":  # connection://[host]:port
-                host, port = address.rsplit(":", 1)
-                # create_connection with try both ipv4 and ipv6.
-                s = socket.create_connection((host.strip("[]"), int(port)))
-            else:
-                raise ValueError("invalid connection: {}".format(connection))
-            DebugCommunication.__init__(
-                self, s.makefile("rb"), s.makefile("wb"), init_commands, 
log_file
-            )
-            self.connection = connection
+    @classmethod
+    async def connect(
+        cls,
+        *,
+        connection: str,
+        log_file: Optional[str] = None,
+        init_commands: list = [],
+    ) -> "DebugAdapterServer":
+        scheme, address = connection.split("://")
+        if scheme == "unix-connect":  # unix-connect:///path
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            sock.connect(address)
+        elif scheme == "connection":  # connection://[host]:port
+            host, port = address.rsplit(":", 1)
+            # create_connection with try both ipv4 and ipv6.
+            sock = socket.create_connection((host.strip("[]"), int(port)))
         else:
-            DebugCommunication.__init__(
-                self, self.process.stdout, self.process.stdin, init_commands, 
log_file
-            )
+            raise ValueError("invalid connection: {}".format(connection))
+
+        (r, w) = await asyncio.open_connection(sock=sock)
+        return DebugAdapterServer(
+            asyncio.get_running_loop(),
+            r,
+            w,
+            init_commands[:],
+            log_file,
+            connection=connection,
+        )
 
     @classmethod
-    def launch(
+    async def spawn(
+        cls,
+        /,
+        executable: Optional[str] = None,
+        *args: str,
+        init_commands: list = [],
+        log_file: Optional[str] = None,
+        env: Dict[str, str] = {},
+    ) -> "DebugAdapterServer":
+        (process, _) = await cls.launch(
+            executable,
+            *args,
+            env=env.copy(),
+            log_file=log_file,
+        )
+        return DebugAdapterServer(
+            asyncio.get_running_loop(),
+            process.stdout,
+            process.stdin,
+            init_commands[:],
+            log_file,
+            process,
+        )
+
+    @classmethod
+    async def launch(
         cls,
-        *,
         executable: str,
-        env: Optional[dict[str, str]] = None,
-        log_file: Optional[TextIO] = None,
+        *args: str,
+        env: Dict[str, str] = {},
+        log_file: Optional[str] = None,
         connection: Optional[str] = None,
         connection_timeout: Optional[int] = None,
-        additional_args: list[str] = [],
-    ) -> tuple[subprocess.Popen, Optional[str]]:
+    ) -> tuple[asyncio.subprocess.Process, Optional[str]]:
         adapter_env = os.environ.copy()
         if env is not None:
             adapter_env.update(env)
 
         if log_file:
             adapter_env["LLDBDAP_LOG"] = log_file
-        args = [executable]
 
         # Add additional arguments first (like --no-lldbinit)
-        args.extend(additional_args)
+        args = list(args)
 
         if connection is not None:
             args.append("--connection")
@@ -1566,11 +1531,11 @@ def launch(
             args.append("--connection-timeout")
             args.append(str(connection_timeout))
 
-        process = subprocess.Popen(
-            args,
-            stdin=subprocess.PIPE,
-            stdout=subprocess.PIPE,
-            stderr=sys.stderr,
+        process = await asyncio.create_subprocess_exec(
+            executable,
+            *args,
+            stdin=asyncio.subprocess.PIPE,
+            stdout=asyncio.subprocess.PIPE,
             env=adapter_env,
         )
 
@@ -1580,18 +1545,18 @@ def launch(
         # lldb-dap will print the listening address once the listener is
         # made to stdout. The listener is formatted like
         # `connection://host:port` or `unix-connection:///path`.
-        expected_prefix = "Listening for: "
-        out = process.stdout.readline().decode()
+        expected_prefix = b"Listening for: "
+        out = await process.stdout.readline()
         if not out.startswith(expected_prefix):
-            process.kill()
+            process.terminate()
             raise ValueError(
-                "lldb-dap failed to print listening address, expected '{}', 
got '{}'".format(
-                    expected_prefix, out
-                )
+                f"lldb-dap failed to print listening address, expected 
'{expected_prefix}', got '{out}'"
             )
 
         # If the listener expanded into multiple addresses, use the first.
-        connection = 
out.removeprefix(expected_prefix).rstrip("\r\n").split(",", 1)[0]
+        connection = (
+            out.removeprefix(expected_prefix).rstrip(b"\r\n").split(b",", 
1)[0].decode()
+        )
 
         return (process, connection)
 
@@ -1601,23 +1566,10 @@ def get_pid(self) -> int:
         return -1
 
     def terminate(self):
-        try:
-            if self.process is not None:
-                process = self.process
-                self.process = None
-                try:
-                    # When we close stdin it should signal the lldb-dap that no
-                    # new messages will arrive and it should shutdown on its
-                    # own.
-                    process.stdin.close()
-                    process.wait(timeout=20)
-                except subprocess.TimeoutExpired:
-                    process.kill()
-                    process.wait()
-                if process.returncode != 0:
-                    raise DebugAdapterProcessError(process.returncode)
-        finally:
-            super(DebugAdapterServer, self).terminate()
+        if self.process and self.process.returncode is None:
+            self._send.close()
+            self._loop.run_until_complete(self.process.wait())
+        super(DebugAdapterServer, self).terminate()
 
 
 class DebugAdapterError(Exception):
@@ -1955,8 +1907,8 @@ def main():
     )
     if options.debug:
         raw_input('Waiting for debugger to attach pid "%i"' % (dbg.get_pid()))
-    if options.replay:
-        dbg.replay_packets(options.replay)
+    # if options.replay:
+    #     dbg.replay_packets(options.replay)
     else:
         run_vscode(dbg, args, options)
     dbg.terminate()
diff --git 
a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
index f7b1ed80fceb5..579a69b8404d9 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
@@ -1,10 +1,12 @@
+import asyncio
 import os
 import time
+import functools
 from typing import Optional, Callable, Any, List, Union
 import uuid
 
 import dap_server
-from dap_server import Source
+from dap_server import source, Source
 from lldbsuite.test.decorators import skipIf
 from lldbsuite.test.lldbtest import *
 from lldbsuite.test import lldbplatformutil
@@ -21,34 +23,58 @@ class DAPTestCaseBase(TestBase):
     DEFAULT_TIMEOUT = 10 * (10 if ("ASAN_OPTIONS" in os.environ) else 1)
     NO_DEBUG_INFO_TESTCASE = True
 
+    @functools.cached_property
+    def loop(self) -> asyncio.AbstractEventLoop:
+        return asyncio.new_event_loop()
+
     def create_debug_adapter(
         self,
-        lldbDAPEnv: Optional[dict[str, str]] = None,
+        *args: str,
+        env: dict[str, str] = {},
         connection: Optional[str] = None,
-        additional_args: Optional[list[str]] = None,
     ):
         """Create the Visual Studio Code debug adapter"""
         self.assertTrue(
             is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
         )
         log_file_path = self.getBuildArtifact("dap.txt")
-        self.dap_server = dap_server.DebugAdapterServer(
-            executable=self.lldbDAPExec,
-            connection=connection,
-            init_commands=self.setUpCommands(),
-            log_file=log_file_path,
-            env=lldbDAPEnv,
-            additional_args=additional_args or [],
+        if connection:
+            _, connection = self.loop.run_until_complete(
+                dap_server.DebugAdapterServer.launch(
+                    self.lldbDAPExec,
+                    *args,
+                    env=env.copy(),
+                    log_file=log_file_path,
+                    connection=connection,
+                )
+            )
+            self.dap_server = self.loop.run_until_complete(
+                dap_server.DebugAdapterServer.connect(
+                    connection=connection,
+                    init_commands=self.setUpCommands(),
+                    log_file=log_file_path,
+                )
+            )
+            return
+
+        self.dap_server = self.loop.run_until_complete(
+            dap_server.DebugAdapterServer.spawn(
+                self.lldbDAPExec,
+                *args,
+                init_commands=self.setUpCommands(),
+                log_file=log_file_path,
+                env=env.copy(),
+            )
         )
 
     def build_and_create_debug_adapter(
         self,
-        lldbDAPEnv: Optional[dict[str, str]] = None,
+        *args: str,
+        env: dict[str, str] = {},
         dictionary: Optional[dict] = None,
-        additional_args: Optional[list[str]] = None,
     ):
         self.build(dictionary=dictionary)
-        self.create_debug_adapter(lldbDAPEnv, additional_args=additional_args)
+        self.create_debug_adapter(*args, env=env.copy())
 
     def build_and_create_debug_adapter_for_attach(self):
         """Variant of build_and_create_debug_adapter that builds a uniquely
@@ -74,7 +100,7 @@ def set_source_breakpoints_assembly(
         self, source_reference, lines, data=None, wait_for_resolve=True
     ):
         return self.set_source_breakpoints_from_source(
-            Source.build(source_reference=source_reference),
+            source(source_reference=source_reference),
             lines,
             data,
             wait_for_resolve,
@@ -91,9 +117,7 @@ def set_source_breakpoints_from_source(
         if response is None:
             return []
         breakpoints = response["body"]["breakpoints"]
-        breakpoint_ids = []
-        for breakpoint in breakpoints:
-            breakpoint_ids.append("%i" % (breakpoint["id"]))
+        breakpoint_ids = [b["id"] for b in breakpoints if "id" in b]
         if wait_for_resolve:
             self.wait_for_breakpoints_to_resolve(breakpoint_ids)
         return breakpoint_ids
@@ -111,16 +135,14 @@ def set_function_breakpoints(
         if response is None:
             return []
         breakpoints = response["body"]["breakpoints"]
-        breakpoint_ids = []
-        for breakpoint in breakpoints:
-            breakpoint_ids.append("%i" % (breakpoint["id"]))
+        breakpoint_ids = [b["id"] for b in breakpoints if "id" in b]
         if wait_for_resolve:
             self.wait_for_breakpoints_to_resolve(breakpoint_ids)
         return breakpoint_ids
 
     def wait_for_breakpoints_to_resolve(
-        self, breakpoint_ids: list[str], timeout: Optional[float] = 
DEFAULT_TIMEOUT
-    ):
+        self, breakpoint_ids: list[int], timeout: Optional[float] = 
DEFAULT_TIMEOUT
+    ) -> None:
         unresolved_breakpoints = 
self.dap_server.wait_for_breakpoints_to_be_verified(
             breakpoint_ids, timeout
         )
@@ -162,28 +184,23 @@ def verify_breakpoint_hit(
         any breakpoint location in the "breakpoint_ids" array.
         "breakpoint_ids" should be a list of breakpoint ID strings
         (["1", "2"]). The return value from self.set_source_breakpoints()
-        or self.set_function_breakpoints() can be passed to this function"""
-        stopped_events = self.dap_server.wait_for_stopped(timeout)
+        or self.set_function_breakpoints() can be passed to this function."""
+        self.dap_server.wait_for_stopped(timeout)
         normalized_bp_ids = [str(b) for b in breakpoint_ids]
-        for stopped_event in stopped_events:
-            if "body" in stopped_event:
-                body = stopped_event["body"]
-                if "reason" not in body:
-                    continue
-                if (
-                    body["reason"] != "breakpoint"
-                    and body["reason"] != "instruction breakpoint"
-                ):
-                    continue
-                if "hitBreakpointIds" not in body:
-                    continue
-                hit_breakpoint_ids = body["hitBreakpointIds"]
-                for bp in hit_breakpoint_ids:
-                    if str(bp) in normalized_bp_ids:
-                        return
-        self.assertTrue(
-            False,
-            f"breakpoint not hit, wanted breakpoint_ids {breakpoint_ids} in 
stopped_events {stopped_events}",
+        for body in self.dap_server.thread_stop_reasons.values():
+            if (
+                body["reason"] != "breakpoint"
+                and body["reason"] != "instruction breakpoint"
+            ):
+                continue
+            if "hitBreakpointIds" not in body:
+                continue
+            hit_breakpoint_ids = body["hitBreakpointIds"]
+            for bp in hit_breakpoint_ids:
+                if str(bp) in normalized_bp_ids:
+                    return
+        self.fail(
+            f"breakpoint not hit, wanted breakpoint_ids {breakpoint_ids} in 
stop reasons {self.dap_server.thread_stop_reasons}",
         )
 
     def verify_all_breakpoints_hit(self, breakpoint_ids, 
timeout=DEFAULT_TIMEOUT):
@@ -449,18 +466,13 @@ def continue_to_exception_breakpoint(self, filter_label, 
timeout=DEFAULT_TIMEOUT
         )
 
     def continue_to_exit(self, exitCode=0, timeout=DEFAULT_TIMEOUT):
-        self.do_continue()
-        stopped_events = self.dap_server.wait_for_stopped(timeout)
-        self.assertEqual(
-            len(stopped_events), 1, "stopped_events = 
{}".format(stopped_events)
-        )
-        self.assertEqual(
-            stopped_events[0]["event"], "exited", "make sure program ran to 
completion"
-        )
+        if not self.dap_server.is_exited:
+            self.do_continue()
+            self.dap_server.wait_for_exited(timeout)
         self.assertEqual(
-            stopped_events[0]["body"]["exitCode"],
+            self.dap_server.exit_status,
             exitCode,
-            "exitCode == %i" % (exitCode),
+            f"want exitCode == {exitCode}, got {self.dap_server.exit_status}",
         )
 
     def disassemble(self, threadId=None, frameIndex=None):
@@ -517,6 +529,7 @@ def cleanup():
             if disconnectAutomatically:
                 self.dap_server.request_disconnect(terminateDebuggee=True)
             self.dap_server.terminate()
+            # self.dap_server.dump_log()
 
         # Execute the cleanup function during test case tear down.
         self.addTearDownHook(cleanup)
@@ -546,6 +559,7 @@ def cleanup():
             if disconnectAutomatically:
                 self.dap_server.request_disconnect(terminateDebuggee=True)
             self.dap_server.terminate()
+            # self.dap_server.dump_log()
 
         # Execute the cleanup function during test case tear down.
         self.addTearDownHook(cleanup)
@@ -563,13 +577,13 @@ def build_and_launch(
         self,
         program,
         *,
-        lldbDAPEnv: Optional[dict[str, str]] = None,
+        lldbDAPEnv: dict[str, str] = {},
         **kwargs,
     ):
         """Build the default Makefile target, create the DAP debug adapter,
         and launch the process.
         """
-        self.build_and_create_debug_adapter(lldbDAPEnv)
+        self.build_and_create_debug_adapter(env=lldbDAPEnv.copy())
         self.assertTrue(os.path.exists(program), "executable must exist")
 
         return self.launch(program, **kwargs)
diff --git 
a/lldb/test/API/tools/lldb-dap/breakpoint-assembly/TestDAP_breakpointAssembly.py
 
b/lldb/test/API/tools/lldb-dap/breakpoint-assembly/TestDAP_breakpointAssembly.py
index fab109c93a17b..c6b7ac19ce9c4 100644
--- 
a/lldb/test/API/tools/lldb-dap/breakpoint-assembly/TestDAP_breakpointAssembly.py
+++ 
b/lldb/test/API/tools/lldb-dap/breakpoint-assembly/TestDAP_breakpointAssembly.py
@@ -113,7 +113,7 @@ def test_persistent_assembly_breakpoint(self):
                 "Expected one assembly breakpoint to be set",
             )
 
-            persistent_breakpoint_source = 
self.dap_server.resolved_breakpoints[
+            persistent_breakpoint_source = self.dap_server.breakpoints[
                 persistent_breakpoint_ids[0]
             ]["source"]
             self.assertIn(
diff --git a/lldb/test/API/tools/lldb-dap/console/TestDAP_console.py 
b/lldb/test/API/tools/lldb-dap/console/TestDAP_console.py
index ceddaeb50cd3b..85da471a2435c 100644
--- a/lldb/test/API/tools/lldb-dap/console/TestDAP_console.py
+++ b/lldb/test/API/tools/lldb-dap/console/TestDAP_console.py
@@ -1,10 +1,8 @@
 """
-Test lldb-dap setBreakpoints request
+Test lldb-dap console output.
 """
 
-import dap_server
 import lldbdap_testcase
-from lldbsuite.test import lldbutil
 from lldbsuite.test.decorators import *
 from lldbsuite.test.lldbtest import *
 
diff --git a/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py 
b/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
index af5c62a8c4eb5..51cd7c84b240f 100644
--- a/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
+++ b/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
@@ -7,24 +7,26 @@
 from lldbsuite.test.decorators import *
 import lldbdap_testcase
 import dap_server
+import asyncio
 
 EXIT_FAILURE = 1
 EXIT_SUCCESS = 0
 
 
-class TestDAP_io(lldbdap_testcase.DAPTestCaseBase):
-    def launch(self):
+class TestDAP_io(unittest.IsolatedAsyncioTestCase, 
lldbdap_testcase.DAPTestCaseBase):
+    async def launch(self):
         log_file_path = self.getBuildArtifact("dap.txt")
-        process, _ = dap_server.DebugAdapterServer.launch(
-            executable=self.lldbDAPExec, log_file=log_file_path
+        process, _ = await dap_server.DebugAdapterServer.launch(
+            executable=self.lldbDAPExec,
+            log_file=log_file_path,
         )
 
-        def cleanup():
+        async def cleanup():
             # If the process is still alive, terminate it.
-            if process.poll() is None:
+            if process.returncode is None:
                 process.terminate()
-                process.wait()
-            stdout_data = process.stdout.read().decode()
+                await process.wait()
+            stdout_data = (await process.stdout.read()).decode()
             print("========= STDOUT =========", file=sys.stderr)
             print(stdout_data, file=sys.stderr)
             print("========= END =========", file=sys.stderr)
@@ -34,54 +36,54 @@ def cleanup():
             print("========= END =========", file=sys.stderr)
 
         # Execute the cleanup function during test case tear down.
-        self.addTearDownHook(cleanup)
+        self.addAsyncCleanup(cleanup)
 
         return process
 
-    def test_eof_immediately(self):
+    async def test_eof_immediately(self):
         """
         lldb-dap handles EOF without any other input.
         """
-        process = self.launch()
+        process = await self.launch()
         process.stdin.close()
-        self.assertEqual(process.wait(timeout=5.0), EXIT_SUCCESS)
+        self.assertEqual(await process.wait(), EXIT_SUCCESS)
 
-    def test_invalid_header(self):
+    async def test_invalid_header(self):
         """
         lldb-dap returns a failure exit code when the input stream is closed
         with a malformed request header.
         """
-        process = self.launch()
+        process = await self.launch()
         process.stdin.write(b"not the correct message header")
         process.stdin.close()
-        self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
+        self.assertEqual(await process.wait(), EXIT_FAILURE)
 
-    def test_partial_header(self):
+    async def test_partial_header(self):
         """
         lldb-dap returns a failure exit code when the input stream is closed
         with an incomplete message header is in the message buffer.
         """
-        process = self.launch()
+        process = await self.launch()
         process.stdin.write(b"Content-Length: ")
         process.stdin.close()
-        self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
+        self.assertEqual(await process.wait(), EXIT_FAILURE)
 
-    def test_incorrect_content_length(self):
+    async def test_incorrect_content_length(self):
         """
         lldb-dap returns a failure exit code when reading malformed content
         length headers.
         """
-        process = self.launch()
+        process = await self.launch()
         process.stdin.write(b"Content-Length: abc")
         process.stdin.close()
-        self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
+        self.assertEqual(await process.wait(), EXIT_FAILURE)
 
-    def test_partial_content_length(self):
+    async def test_partial_content_length(self):
         """
         lldb-dap returns a failure exit code when the input stream is closed
         with a partial message in the message buffer.
         """
-        process = self.launch()
+        process = await self.launch()
         process.stdin.write(b"Content-Length: 10\r\n\r\n{")
         process.stdin.close()
-        self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
+        self.assertEqual(await process.wait(), EXIT_FAILURE)
diff --git a/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py 
b/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py
index 8db2316e73fc8..21962b04bf48e 100644
--- a/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py
+++ b/lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py
@@ -84,17 +84,17 @@ def test_termination(self):
         """
         self.create_debug_adapter()
         # The underlying lldb-dap process must be alive
-        self.assertEqual(self.dap_server.process.poll(), None)
+        self.assertEqual(self.dap_server.process.returncode, None)
 
         # The lldb-dap process should finish even though
         # we didn't close the communication socket explicitly
         self.dap_server.request_disconnect()
 
         # Wait until the underlying lldb-dap process dies.
-        self.dap_server.process.wait(timeout=self.DEFAULT_TIMEOUT)
+        self.dap_server.terminate()
 
         # Check the return code
-        self.assertEqual(self.dap_server.process.poll(), 0)
+        self.assertEqual(self.dap_server.process.returncode, 0)
 
     def test_stopOnEntry(self):
         """
@@ -130,17 +130,17 @@ def test_cwd(self):
         output = self.get_stdout()
         self.assertTrue(output and len(output) > 0, "expect program output")
         lines = output.splitlines()
-        found = False
-        for line in lines:
-            if line.startswith('cwd = "'):
-                quote_path = '"%s"' % (program_parent_dir)
-                found = True
-                self.assertIn(
-                    quote_path,
-                    line,
-                    "working directory '%s' not in '%s'" % 
(program_parent_dir, line),
-                )
-        self.assertTrue(found, "verified program working directory")
+        self.assertIn(f'cwd = "{program_parent_dir}"', lines)
+        # for line in lines:
+        #     if line.startswith('cwd = "'):
+        #         quote_path = '"%s"' % (program_parent_dir)
+        #         found = True
+        #         self.assertIn(
+        #             quote_path,
+        #             line,
+        #             "working directory '%s' not in '%s'" % 
(program_parent_dir, line),
+        #         )
+        # self.assertTrue(found, "verified program working directory")
 
     def test_debuggerRoot(self):
         """
@@ -408,14 +408,14 @@ def test_commands(self):
         # Get output from the console. This should contain both the
         # "stopCommands" that were run after the first breakpoint was hit
         self.continue_to_breakpoints(breakpoint_ids)
-        output = self.get_console()
+        output = self.collect_console(pattern=stopCommands[-1])
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue again and hit the second breakpoint.
         # Get output from the console. This should contain both the
         # "stopCommands" that were run after the second breakpoint was hit
         self.continue_to_breakpoints(breakpoint_ids)
-        output = self.get_console()
+        output = self.collect_console(pattern=stopCommands[-1])
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue until the program exits
@@ -423,7 +423,7 @@ def test_commands(self):
         # Get output from the console. This should contain both the
         # "exitCommands" that were run after the second breakpoint was hit
         # and the "terminateCommands" due to the debugging session ending
-        output = self.collect_console(pattern=terminateCommands[0])
+        output = self.collect_console(pattern=terminateCommands[-1])
         self.verify_commands("exitCommands", output, exitCommands)
         self.verify_commands("terminateCommands", output, terminateCommands)
 
@@ -476,14 +476,14 @@ def test_extra_launch_commands(self):
         self.verify_commands("launchCommands", output, launchCommands)
         # Verify the "stopCommands" here
         self.continue_to_next_stop()
-        output = self.get_console()
+        output = self.collect_console(pattern=stopCommands[-1])
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue and hit the second breakpoint.
         # Get output from the console. This should contain both the
         # "stopCommands" that were run after the first breakpoint was hit
         self.continue_to_next_stop()
-        output = self.get_console()
+        output = self.collect_console(pattern=stopCommands[-1])
         self.verify_commands("stopCommands", output, stopCommands)
 
         # Continue until the program exits
@@ -600,7 +600,8 @@ def test_no_lldbinit_flag(self):
 
             # Test with --no-lldbinit flag (should NOT source .lldbinit)
             self.build_and_create_debug_adapter(
-                lldbDAPEnv={"HOME": temp_home}, 
additional_args=["--no-lldbinit"]
+                "--no-lldbinit",
+                env={"HOME": temp_home},
             )
             program = self.getBuildArtifact("a.out")
 
@@ -611,7 +612,7 @@ def test_no_lldbinit_flag(self):
             self.launch(program, initCommands=initCommands, stopOnEntry=True)
 
             # Get console output to verify the setting was NOT set from 
.lldbinit
-            output = self.get_console()
+            output = self.collect_console(pattern=initCommands[-1])
             self.assertTrue(output and len(output) > 0, "expect console 
output")
 
             # Verify the setting has default value, not "never" from .lldbinit
diff --git a/lldb/test/API/tools/lldb-dap/server/TestDAP_server.py 
b/lldb/test/API/tools/lldb-dap/server/TestDAP_server.py
index 12b321cf42778..72c6029e6c566 100644
--- a/lldb/test/API/tools/lldb-dap/server/TestDAP_server.py
+++ b/lldb/test/API/tools/lldb-dap/server/TestDAP_server.py
@@ -6,6 +6,7 @@
 import signal
 import tempfile
 import time
+import asyncio
 
 import dap_server
 from lldbsuite.test.decorators import *
@@ -14,20 +15,28 @@
 
 
 class TestDAP_server(lldbdap_testcase.DAPTestCaseBase):
+    loop = asyncio.new_event_loop()
+
     def start_server(
         self, connection, connection_timeout=None, 
wait_seconds_for_termination=None
     ):
         log_file_path = self.getBuildArtifact("dap.txt")
-        (process, connection) = dap_server.DebugAdapterServer.launch(
-            executable=self.lldbDAPExec,
-            connection=connection,
-            connection_timeout=connection_timeout,
-            log_file=log_file_path,
+        (process, connection) = self.loop.run_until_complete(
+            dap_server.DebugAdapterServer.launch(
+                executable=self.lldbDAPExec,
+                connection=connection,
+                connection_timeout=connection_timeout,
+                log_file=log_file_path,
+            )
         )
 
         def cleanup():
             if wait_seconds_for_termination is not None:
-                process.wait(wait_seconds_for_termination)
+                self.loop.run_until_complete(
+                    asyncio.wait_for(
+                        process.wait(), timeout=wait_seconds_for_termination
+                    )
+                )
             else:
                 process.terminate()
 
@@ -36,8 +45,10 @@ def cleanup():
         return (process, connection)
 
     def run_debug_session(self, connection, name, 
sleep_seconds_in_middle=None):
-        self.dap_server = dap_server.DebugAdapterServer(
-            connection=connection,
+        self.dap_server = self.loop.run_until_complete(
+            dap_server.DebugAdapterServer.connect(
+                connection=connection,
+            )
         )
         program = self.getBuildArtifact("a.out")
         source = "main.c"
@@ -49,7 +60,7 @@ def run_debug_session(self, connection, name, 
sleep_seconds_in_middle=None):
             disconnectAutomatically=False,
         )
         if sleep_seconds_in_middle is not None:
-            time.sleep(sleep_seconds_in_middle)
+            loop.run_until_complete(asyncio.sleep(sleep_seconds_in_middle))
         self.set_source_breakpoints(source, [breakpoint_line])
         self.continue_to_next_stop()
         self.continue_to_exit()
@@ -92,8 +103,10 @@ def test_server_interrupt(self):
         """
         self.build()
         (process, connection) = 
self.start_server(connection="listen://localhost:0")
-        self.dap_server = dap_server.DebugAdapterServer(
-            connection=connection,
+        self.dap_server = self.loop.run_until_complete(
+            dap_server.DebugAdapterServer.connect(
+                connection=connection,
+            )
         )
         program = self.getBuildArtifact("a.out")
         source = "main.c"
@@ -111,8 +124,8 @@ def test_server_interrupt(self):
         process.send_signal(signal.SIGINT)
 
         # Wait for both events since they can happen in any order.
-        self.dap_server.wait_for_event(["terminated", "exited"])
-        self.dap_server.wait_for_event(["terminated", "exited"])
+        self.dap_server.wait_for_exited()
+        self.dap_server.wait_for_terminated()
         self.assertIsNotNone(
             self.dap_server.exit_status,
             "Process exited before interrupting lldb-dap server",
@@ -131,7 +144,7 @@ def test_connection_timeout_at_server_start(self):
         )
 
     @skipIfWindows
-    def test_connection_timeout_long_debug_session(self):
+    async def test_connection_timeout_long_debug_session(self):
         """
         Test launching lldb-dap in server mode with connection timeout and 
terminating the server after the a long debug session.
         """

_______________________________________________
lldb-commits mailing list
[email protected]
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to