owenpshaw created this revision.
owenpshaw added reviewers: clayborg, labath.
Adds new utilities that make it easier to write test cases for lldb acting as a
client over a gdb-remote connection.
- A GDBRemoteTestBase class that starts a mock GDB server and provides an easy
way to check client packets
- A MockGDBServer that, via MockGDBServerResponder, can be made to issue server
responses that test client behavior.
- Utility functions for handling common data encoding/decoding
- Utility functions for creating dummy targets from YAML files
----
Split from the review at https://reviews.llvm.org/D42145, which was a new
feature that necessitated the new testing capabilities.
https://reviews.llvm.org/D42195
Files:
packages/Python/lldbsuite/test/functionalities/gdb_remote_client/TestGDBRemoteClient.py
packages/Python/lldbsuite/test/functionalities/gdb_remote_client/a.yaml
packages/Python/lldbsuite/test/functionalities/gdb_remote_client/gdbclientutils.py
Index: packages/Python/lldbsuite/test/functionalities/gdb_remote_client/gdbclientutils.py
===================================================================
--- /dev/null
+++ packages/Python/lldbsuite/test/functionalities/gdb_remote_client/gdbclientutils.py
@@ -0,0 +1,432 @@
+import os
+import os.path
+import subprocess
+import threading
+import socket
+import lldb
+from lldbsuite.test.lldbtest import *
+from lldbsuite.test import lldbtest_config
+
+
+def yaml2obj_executable():
+ """
+ Get the path to the yaml2obj executable, which can be used to create test
+ object files from easy to write yaml instructions.
+
+ Throws an Exception if the executable cannot be found.
+ """
+ # Tries to find yaml2obj at the same folder as the lldb
+ path = os.path.join(os.path.dirname(lldbtest_config.lldbExec), "yaml2obj")
+ if os.path.exists(path):
+ return path
+ raise Exception("yaml2obj executable not found")
+
+
+def yaml2elf(yaml_path, elf_path):
+ """
+ Create an ELF file at the given path from a yaml file at the given path.
+
+ Throws a subprocess.CalledProcessError if the ELF could not be created.
+ """
+ yaml2obj = yaml2obj_executable()
+ command = [yaml2obj, "-o=%s" % elf_path, yaml_path]
+ system([command])
+
+
+def checksum(message):
+ """
+ Calculate the GDB server protocol checksum of the message.
+
+ The GDB server protocol uses a simple modulo 256 sum.
+ """
+ check = 0
+ for c in message:
+ check += ord(c)
+ return check % 256
+
+
+def frame_packet(message):
+ """
+ Create a framed packet that's ready to send over the GDB connection
+ channel.
+
+ Framing includes surrounding the message between $ and #, and appending
+ a two character hex checksum.
+ """
+ return "$%s#%02x" % (message, checksum(message))
+
+
+def escape_binary(message):
+ """
+ Escape the binary message using the process described in the GDB server
+ protocol documentation.
+
+ Most bytes are sent through as-is, but $, #, and { are escaped by writing
+ a { followed by the original byte mod 0x20.
+ """
+ out = ""
+ for c in message:
+ d = ord(c)
+ if d in (0x23, 0x24, 0x7d):
+ out += chr(0x7d)
+ out += chr(d ^ 0x20)
+ else:
+ out += c
+ return out
+
+
+def hex_encode_bytes(message):
+ """
+ Encode the binary message by converting each byte into a two-character
+ hex string.
+ """
+ out = ""
+ for c in message:
+ out += "%02x" % ord(c)
+ return out
+
+
+def hex_decode_bytes(hex_bytes):
+ """
+ Decode the hex string into a binary message by converting each two-character
+ hex string into a single output byte.
+ """
+ out = ""
+ hex_len = len(hex_bytes)
+ while i < hex_len - 1:
+ out += chr(int(hex_bytes[i:i + 2]), 16)
+ i += 2
+ return out
+
+
+class MockGDBServerResponder:
+ """
+ A base class for handing client packets and issuing server responses for
+ GDB tests.
+
+ This handles many typical situations, while still allowing subclasses to
+ completely customize their responses.
+
+ Most subclasses will be interested in overriding the other() method, which
+ handles any packet not recognized in the common packet handling code.
+ """
+
+ registerCount = 40
+ packetLog = None
+
+ def __init__(self):
+ self.packetLog = []
+
+ def respond(self, packet):
+ """
+ Return the unframed packet data that the server should issue in response
+ to the given packet received from the client.
+ """
+ self.packetLog.append(packet)
+ if packet == "g":
+ return self.readRegisters()
+ if packet[0] == "G":
+ return self.writeRegisters(packet[1:])
+ if packet[0] == "p":
+ return self.readRegister(int(packet[1:], 16))
+ if packet[0] == "P":
+ register, value = packet[1:].split("=")
+ return self.readRegister(int(register, 16), value)
+ if packet[0] == "m":
+ addr, length = [int(x, 16) for x in packet[1:].split(',')]
+ return self.readMemory(addr, length)
+ if packet[0] == "M":
+ location, encoded_data = packet[1:].split(":")
+ addr, length = [int(x, 16) for x in location.split(',')]
+ return self.writeMemory(addr, encoded_data)
+ if packet[0:7] == "qSymbol":
+ return self.qSymbol(packet[8:])
+ if packet[0:10] == "qSupported":
+ return self.qSupported(packet[11:].split(";"))
+ if packet == "qfThreadInfo":
+ return self.qfThreadInfo()
+ if packet == "qC":
+ return self.qC()
+ if packet == "?":
+ return self.haltReason()
+ if packet[0] == "H":
+ return self.selectThread(packet[1], int(packet[2:], 16))
+ if packet[0:6] == "qXfer:":
+ obj, read, annex, location = packet[6:].split(":")
+ offset, length = [int(x, 16) for x in location.split(',')]
+ data, has_more = self.qXferRead(obj, annex, offset, length)
+ if data is not None:
+ return self._qXferResponse(data, has_more)
+ return ""
+ return self.other(packet)
+
+ def readRegisters(self):
+ return "xxxxxxxx" * self.registerCount
+
+ def readRegister(self, register):
+ return "xxxxxxxx"
+
+ def writeRegisters(self, registers_hex):
+ return "OK"
+
+ def writeRegister(self, register, value_hex):
+ return "OK"
+
+ def readMemory(self, addr, length):
+ return "00" * length
+
+ def writeMemory(self, addr, data_hex):
+ return "OK"
+
+ def qSymbol(self, symbol_args):
+ return "OK"
+
+ def qSupported(self, client_supported):
+ return "PacketSize=3fff;QStartNoAckMode+"
+
+ def qfThreadInfo(self):
+ return "l"
+
+ def qC(self):
+ return "QC0"
+
+ def haltReason(self):
+ # SIGINT is 2, return type is 2 digit hex string
+ return "S02"
+
+ def qXferRead(self, obj, annex, offset, length):
+ return None, False
+
+ def _qXferResponse(self, data, has_more):
+ return "%s%s" % ("m" if has_more else "l", escape_binary(data))
+
+ def selectThread(self, op, thread_id):
+ return "OK"
+
+ def other(self, packet):
+ # empty string means unsupported
+ return ""
+
+
+class MockGDBServer:
+ """
+ A simple TCP-based GDB server that can test client behavior by receiving
+ commands and issuing custom-tailored responses.
+
+ Responses are generated via the .responder property, which should be an
+ instance of a class based on MockGDBServerResponder.
+ """
+
+ responder = None
+ port = 0
+ _socket = None
+ _client = None
+ _thread = None
+ _incomingPacket = None
+ _incomingChecksum = None
+ _shouldSendAck = True
+ _isExpectingAck = False
+
+ def __init__(self, port = 0):
+ self.responder = MockGDBServerResponder()
+ self.port = port
+ self._socket = socket.socket()
+
+ def start(self):
+ # Block until the socket is up, so self.port is available immediately.
+ # Then start a thread that waits for a client connection.
+ addr = ("127.0.0.1", self.port)
+ self._socket.bind(addr)
+ self.port = self._socket.getsockname()[1]
+ self._socket.listen(0)
+ self._thread = threading.Thread(target=self._run)
+ self._thread.start()
+
+ def stop(self):
+ if self._client is not None:
+ self._client.shutdown(socket.SHUT_RDWR)
+ self._client.close()
+ # Would call self._socket.shutdown, but it blocks forever for some
+ # unknown reason. close() works just fine.
+ self._socket.close()
+ self._thread.join()
+ self._thread = None
+
+ def _run(self):
+ # For testing purposes, we only need to worry about one client
+ # connecting just one time.
+ try:
+ # accept() is stubborn and won't fail even when the socket is
+ # shutdown, so we'll use a timeout
+ self._socket.settimeout(2.0)
+ client, client_addr = self._socket.accept()
+ self._client = client
+ # The connected client inherits its timeout from self._socket,
+ # but we'll use a blocking socket for the client
+ self._client.settimeout(None)
+ except:
+ return
+ self._shouldSendAck = True
+ self._isExpectingAck = False
+ data = None
+ while True:
+ try:
+ data = self._client.recv(4096)
+ if data is None or len(data) == 0:
+ break
+ except Exception as e:
+ self._client.close()
+ break
+ self._receive(data)
+
+ def _receive(self, data):
+ i = 0
+ data_len = len(data)
+ while i < data_len:
+ # If we haven't set _incomingPacket to anything yet, it means we're
+ # expecting the start of a new packet.
+ if self._incomingPacket is None:
+ if data[i] == '+':
+ if self._isExpectingAck:
+ # We're expecting an ack from the client, so ignore it.
+ self._isExpectingAck = False
+ else:
+ # Not expecting an ack, so just ack back.
+ self._client.sendall('+')
+ i += 1
+ elif data[i] == '$':
+ self._incomingPacket = ""
+ i += 1
+ else:
+ # Unexpected byte, closing connection to indicate error.
+ self._client.close()
+ return
+ # If we haven't set _incomingChecksum to anything yet, it means
+ # we're collecting bytes, waiting for a # to indicate the end of
+ # packet data.
+ elif self._incomingChecksum is None:
+ while i < data_len:
+ if data[i] == '#':
+ self._incomingChecksum = ""
+ i += 1
+ break
+ self._incomingPacket += data[i]
+ i += 1
+ # If we have set _incomingChecksum, then we're collecting the
+ # two bytes of the checksum string.
+ else:
+ while i < data_len and len(self._incomingChecksum) < 2:
+ self._incomingChecksum += data[i]
+ i += 1
+ if len(self._incomingChecksum) == 2:
+ check = None
+ try:
+ check = int(self._incomingChecksum, 16)
+ except ValueError:
+ # Non-hex checksum, closing connection.
+ self._client.close()
+ return
+ if check != checksum(self._incomingPacket):
+ # Mismatching checksums, closing connection.
+ # Since we're using TCP transport, the checksum can
+ # only be wrong if the client did something wrong.
+ self._client.close()
+ return
+ packet = self._incomingPacket
+ self._incomingPacket = None
+ self._incomingChecksum = None
+ self._handlePacket(packet)
+
+ def _handlePacket(self, packet):
+ response = ""
+ # We'll handle the ack stuff here since it's not something any of the
+ # tests will be concerned about, and it'll get turned off quicly anyway.
+ if self._shouldSendAck:
+ self._client.sendall('+')
+ self._isExpectingAck = True
+ if packet == "QStartNoAckMode":
+ self._shouldSendAck = False
+ response = "OK"
+ elif self.responder is not None:
+ # Delegate everything else to our responder
+ response = self.responder.respond(packet)
+ # Handle packet framing since we don't want to bother tests with it.
+ framed = frame_packet(response)
+ self._client.sendall(framed)
+
+
+class GDBRemoteTestBase(TestBase):
+ """
+ Base class for GDB client tests.
+
+ This class will setup and start a mock GDB server for the test to use.
+ It also provides assertPacketLogContains, which simplifies the checking
+ of packets sent by the client.
+ """
+
+ NO_DEBUG_INFO_TESTCASE = True
+ mydir = TestBase.compute_mydir(__file__)
+ server = None
+ temp_files = None
+
+ def setUp(self):
+ TestBase.setUp(self)
+ self.temp_files = []
+ self.server = MockGDBServer()
+ self.server.start()
+
+ def tearDown(self):
+ for temp_file in self.temp_files:
+ self.RemoveTempFile(temp_file)
+ self.server.stop()
+ self.temp_files = []
+ TestBase.tearDown(self)
+
+ def createTarget(self, yaml_path):
+ """
+ Create an ELF target by auto-generating the ELF based on the given yaml
+ instructions.
+
+ This will track the generated ELF so it can be automatically removed
+ during tearDown.
+ """
+ yaml_base, ext = os.path.splitext(yaml_path)
+ elf_path = "%s.elf" % yaml_base
+ yaml2elf(yaml_path, elf_path)
+ self.temp_files.append(elf_path)
+ return self.dbg.CreateTarget(elf_path)
+
+ def connect(self, target):
+ """
+ Create a process by connecting to the mock GDB server.
+
+ Includes assertions that the process was successfully created.
+ """
+ listener = self.dbg.GetListener()
+ error = lldb.SBError()
+ url = "connect://localhost:%d" % self.server.port
+ process = target.ConnectRemote(listener, url, "gdb-remote", error)
+ self.assertTrue(error.Success(), error.description)
+ self.assertTrue(process, PROCESS_IS_VALID)
+
+ def assertPacketLogContains(self, packets):
+ """
+ Assert that the mock server's packet log contains the given packets.
+
+ The packet log includes all packets sent by the client and received
+ by the server. This fuction makes it easy to verify that the client
+ sent the expected packets to the server.
+
+ The check does not require that the packets be consecutive, but does
+ require that they are ordered in the log as they ordered in the arg.
+ """
+ i = 0
+ j = 0
+ log = self.server.responder.packetLog
+ while i < len(packets) and j < len(log):
+ if log[j] == packets[i]:
+ i += 1
+ j += 1
+ if i < len(packets):
+ self.fail("Did not receive: %s\n\t%s" % (packets[i],
+ '\n\t'.join(log[-10:])))
Index: packages/Python/lldbsuite/test/functionalities/gdb_remote_client/a.yaml
===================================================================
--- /dev/null
+++ packages/Python/lldbsuite/test/functionalities/gdb_remote_client/a.yaml
@@ -0,0 +1,34 @@
+!ELF
+FileHeader:
+ Class: ELFCLASS32
+ Data: ELFDATA2LSB
+ Type: ET_EXEC
+ Machine: EM_ARM
+Sections:
+ - Name: .text
+ Type: SHT_PROGBITS
+ Flags: [ SHF_ALLOC, SHF_EXECINSTR ]
+ Address: 0x1000
+ AddressAlign: 0x4
+ Content: "c3c3c3c3"
+ - Name: .data
+ Type: SHT_PROGBITS
+ Flags: [ SHF_ALLOC ]
+ Address: 0x2000
+ AddressAlign: 0x4
+ Content: "3232"
+ProgramHeaders:
+ - Type: PT_LOAD
+ Flags: [ PF_X, PF_R ]
+ VAddr: 0x1000
+ PAddr: 0x1000
+ Align: 0x4
+ Sections:
+ - Section: .text
+ - Type: PT_LOAD
+ Flags: [ PF_R, PF_W ]
+ VAddr: 0x2000
+ PAddr: 0x1004
+ Align: 0x4
+ Sections:
+ - Section: .data
Index: packages/Python/lldbsuite/test/functionalities/gdb_remote_client/TestGDBRemoteClient.py
===================================================================
--- /dev/null
+++ packages/Python/lldbsuite/test/functionalities/gdb_remote_client/TestGDBRemoteClient.py
@@ -0,0 +1,12 @@
+import lldb
+from lldbsuite.test.lldbtest import *
+from lldbsuite.test.decorators import *
+from gdbclientutils import *
+
+
+class TestGDBRemoteClient(GDBRemoteTestBase):
+
+ def test_connect(self):
+ """Test connecting to a remote gdb server"""
+ target = self.createTarget("a.yaml")
+ process = self.connect(target)
_______________________________________________
lldb-commits mailing list
[email protected]
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits