Martin Sivák has uploaded a new change for review. Change subject: Add metadata storage support for raw block device ......................................................................
Add metadata storage support for raw block device This adds support for using raw partition or drive as the metadata storage device. It implements partitioning schema to support multiple metadata files and also allows for future extension. Once the backend is connected to, it uses device mapper interface to create the devices for all registered metadata files. The underlying block device uses the following block based structure. The first block on the device is meta block. Each meta block (512B) contains: 8B index of next meta block or 0 if this is the last one 64 bytes with service name (first byte equals the used length) 8B + 8B pointers to data section (start block, length) ... more data sections if needed 16B of 0 acting as a sentinel 4B CRC32 checksum If there is need to use lots of data sections for a single service, is is possible tp use the service name multiple times and the data blocks will be concatenated using device mapper. Change-Id: Id6ab21fb2075fc59b444b5754ade8a1569b83b18 Signed-off-by: Martin Sivak <msi...@redhat.com> --- M ovirt_hosted_engine_ha/lib/storage_backends.py M ovirt_hosted_engine_ha/lib/storage_backends_test.py 2 files changed, 305 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-hosted-engine-ha refs/changes/98/25798/1 diff --git a/ovirt_hosted_engine_ha/lib/storage_backends.py b/ovirt_hosted_engine_ha/lib/storage_backends.py index dda8a96..7bb910c 100644 --- a/ovirt_hosted_engine_ha/lib/storage_backends.py +++ b/ovirt_hosted_engine_ha/lib/storage_backends.py @@ -12,6 +12,13 @@ logger = logging.getLogger(__name__) +class BlockBackendCorruptedException(Exception): + """ + Exception raised by BlockBackend when the internal metadata + structure reports a corrupted data (CRC mismatch). + """ + pass + class StorageBackend(object): """ @@ -140,3 +147,200 @@ # reconnect so all links are refreshed self.disconnect() self.connect() + + +class BlockBackend(StorageBackend): + """ + This uses a pure block device to expose the data. It requires device + mapper support to explode the single device to couple of virtual files. + + This is supposed to be used for devices that are not managed by VDSM + or do not use LVM. + + The structure is described using a table that starts at block 0 + of the block device. + + The format of that block is: + + <the next chained block:64bit> - 0 means this is the last block + <service name used length: 1 Byte> + <service name: 63 Bytes> + <data area start block:64 bit> + <data area block length:64 bit> + ... data area records can be repeated if they fit into one block + ... if there is need for more data area records, one of the chained + ... blocks can add them to the same service name + 128bit (16B) of 0s as a sentinel + 32bit CRC32 + + This information is converted to Device Mapper table and used to create + the logical device files. + """ + + # Binary format specifications, all in network byte order + # The name supports only 63 characters + BlockInfo = namedtuple("BlockInfo", ("next", "name", "pieces", "valid")) + BlockStructHeader = struct.Struct("!Q64p") + BlockStructData = struct.Struct("!QQ") + BlockCRC = struct.Struct("!L") + + def __init__(self, block_dev_name, dm_prefix): + super(BlockBackend, self).__init__() + self._block_dev_name = block_dev_name + self._dm_prefix = dm_prefix.replace("-", "--") + self._services = {} + + def parse_meta_block(self, block): + """ + Parse one info block from the raw byte representation + to namedtuple BlockInfo. + """ + next_block, name = self.BlockStructHeader.unpack_from(block, 0) + pieces = [] + seek = self.BlockStructHeader.size + while True: + start, size = self.BlockStructData.unpack_from(block, seek) + seek += self.BlockStructData.size + # end of blocks section sentinel + if start == size and size == 0: + break + pieces.append((start, size)) + crc = zlib.crc32(block[:seek]) & 0xffffffff + # the comma is important, unpack_from returns a single element tuple + expected_crc, = self.BlockCRC.unpack_from(block, seek) + + return self.BlockInfo._make((next_block, name, + tuple(pieces), crc == expected_crc)) + + def get_services(self, block_device_fo): + """ + Read all the info blocks from a block device and + assemble the services dictionary mapping + service name to a list of (data block start, size) + tuples. + """ + offset = block_device_fo.tell() + services = {} + while True: + block = block_device_fo.read(self.blocksize) + parsed = self.parse_meta_block(block) + if not parsed.valid: + raise BlockBackendCorruptedException( + "CRC for block ending at %d does not match data!" + % block_device_fo.tell()) + services.setdefault(parsed.name, []) + services[parsed.name].extend(parsed.pieces) + if parsed.next == 0: + break + else: + block_device_fo.seek(offset + parsed.next * self.blocksize, 0) + return services + + def dm_name(self, service): + return "-".join([self._dm_prefix, service.replace("-", "--")]) + + def compute_dm_table(self, pieces): + """ + Take a list of tuples in the form of (start, size) and + create the string representation of device mapper table + that can be used in dmsetup. + """ + table = [] + log_start = 0 + for start, size in pieces: + table.append("%d %d linear %s %d" + % (log_start, size, self._block_dev_name, start)) + log_start += size + return "\n".join(table) + + def connect(self): + with open(self._block_dev_name, "rb") as bd: + self._services = self.get_services(bd) + + for name, pieces in self._services.iteritems(): + table = self.compute_dm_table(pieces) + self.dmcreate(name, table) + + def disconnect(self): + for name in self._services: + self.dmremove(name) + + def dmcreate(self, name, table, popen=subprocess.Popen): + """ + Call dmsetup create <name> and pass it the table. + """ + name = self.dm_name(name) + dm = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + args=["dmsetup", "create", name]) + logger.debug("Table for %s\n%s", name, table) + stdout, stderr = dm.communicate(table) + dm.wait() + logger.debug("dmcreate %s stdout: %s", name, stdout) + logger.debug("dmcreate %s stderr: %s", name, stderr) + logger.info("dmcreate %s return code: %d", name, dm.returncode) + + def dmremove(self, name, popen=subprocess.Popen): + """ + Call dmsetup remove to destroy the device. + """ + name = self.dm_name(name) + dm = popen(stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + args=["dmsetup", "remove", name]) + stdout, stderr = dm.communicate() + + dm.wait() + logger.debug("dmremove %s stdout: %s", name, stdout) + logger.debug("dmremove %s stderr: %s", name, stderr) + logger.info("dmremove %s return code: %d", name, dm.returncode) + + def filename(self, service): + if service not in self._services: + return None + else: + return os.path.join("/dev/mapper", self.dm_name(service)), 0 + + def create_info_blocks(self, service_map): + def bc(size): + """ + Return the number of blocks needed to accommodate size + number of Bytes. + """ + return int(math.ceil(size / float(self._blocksize))) + + # first len(service_map) blocks will contain + # the information about services and their data locations + data_start = len(service_map) + info_blocks = [] + + # Linearize the list, put smaller services before bigger ones + service_list = service_map.items() + service_list.sort(key=itemgetter(1)) + + # create list of next ids that starts with 1, goes to the last + # index (size - 1) and then ends with 0 + next_links = range(1, data_start) + [0] + for next_id, (service, size) in zip(next_links, service_list): + block_len = bc(size) + raw_data = StringIO() + raw_data.write(self.BlockStructHeader.pack(next_id, service)) + raw_data.write(self.BlockStructData.pack(data_start, block_len)) + raw_data.write(self.BlockStructData.pack(0, 0)) + crc = zlib.crc32(raw_data.getvalue()) & 0xffffffff + raw_data.write(self.BlockCRC.pack(crc)) + info_blocks.append(raw_data.getvalue()) + data_start += block_len + + return info_blocks + + def create(self, service_map): + info_blocks = self.create_info_blocks(service_map) + with open(self._block_dev_name, "r+b") as dev: + for idx, b in enumerate(info_blocks): + position = idx * self._blocksize + dev.seek(position) + dev.write(b) + + self.disconnect() + self.connect() diff --git a/ovirt_hosted_engine_ha/lib/storage_backends_test.py b/ovirt_hosted_engine_ha/lib/storage_backends_test.py index e69de29..03fb131 100644 --- a/ovirt_hosted_engine_ha/lib/storage_backends_test.py +++ b/ovirt_hosted_engine_ha/lib/storage_backends_test.py @@ -0,0 +1,101 @@ +import unittest +import cStringIO as StringIO +import struct +import zlib + +from .storage_backends import BlockBackend + + +class StorageBackendTests(unittest.TestCase): + + def test_single_bad_block_decode(self): + raw = struct.pack("!Q64pQQQQQQL", + 1, "test", + 1, 100, + 102, 100, + 0, 0, + 0) + b = BlockBackend("/dev/null", "test-1") + block = b.parse_meta_block(raw) + self.assertEqual(block, BlockBackend.BlockInfo( + 1, "test", ((1, 100), (102, 100)), False)) + + def test_service_creation(self): + b = BlockBackend("/dev/null", "test-1") + blocks = b.create_info_blocks({"test1": 300, + "test2": 512, + "test3": 1024*1024*50}) + + self.assertEqual(3, len(blocks)) + + test1 = struct.pack("!Q64pQQQQ", + 1, "test1", + 3, 1, + 0, 0) + test1crc = struct.pack("!L", zlib.crc32(test1) & 0xffffffff) + test2 = struct.pack("!Q64pQQQQ", + 2, "test2", + 4, 1, + 0, 0) + test2crc = struct.pack("!L", zlib.crc32(test2) & 0xffffffff) + test3 = struct.pack("!Q64pQQQQ", + 0, "test3", + 5, 102400, + 0, 0) + test3crc = struct.pack("!L", zlib.crc32(test3) & 0xffffffff) + + expected = [ + test1 + test1crc, + test2 + test2crc, + test3 + test3crc + ] + + self.assertEqual(expected, blocks) + + def test_single_good_block_decode(self): + raw = struct.pack("!Q64pQQQQQQ", + 1, "test", + 1, 100, + 102, 100, + 0, 0) + rawcrc = struct.pack("!L", zlib.crc32(raw) & 0xffffffff) + b = BlockBackend("/dev/null", "test-1") + block = b.parse_meta_block(raw+rawcrc) + self.assertEqual(block, BlockBackend.BlockInfo( + 1, "test", ((1, 100), (102, 100)), True)) + + def test_dm_table(self): + block = BlockBackend.BlockInfo(1, "test", ((1, 100), (102, 100)), True) + b = BlockBackend("/dev/null", "test-1") + table = b.compute_dm_table(block.pieces) + expected = ("0 100 linear /dev/null 1\n" + "100 100 linear /dev/null 102") + self.assertEqual(expected, table) + + def test_get_services(self): + raw1 = struct.pack("!Q64pQQQQQQ", + 1, "test", + 1, 100, + 102, 100, + 0, 0) + raw1crc = struct.pack("!L", zlib.crc32(raw1) & 0xffffffff) + + raw2 = struct.pack("!Q64pQQQQQQ", + 0, "test2", + 2, 200, + 202, 200, + 0, 0) + raw2crc = struct.pack("!L", zlib.crc32(raw2) & 0xffffffff) + + b = BlockBackend("/dev/null", "test-1") + blockdev = StringIO.StringIO() + blockdev.write(raw1) + blockdev.write(raw1crc) + blockdev.seek(b.blocksize) + blockdev.write(raw2) + blockdev.write(raw2crc) + blockdev.seek(0) + expected = {'test': [(1, 100), (102, 100)], + 'test2': [(2, 200), (202, 200)]} + services = b.get_services(blockdev) + self.assertEqual(expected, services) -- To view, visit http://gerrit.ovirt.org/25798 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id6ab21fb2075fc59b444b5754ade8a1569b83b18 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-hosted-engine-ha Gerrit-Branch: master Gerrit-Owner: Martin Sivák <msi...@redhat.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches