Martin Sivák has uploaded a new change for review.

Change subject: Update the services to use the new storage backend classes
......................................................................

Update the services to use the new storage backend classes

Agent no longer cares about the exact filesystem path. It
only selects the proper metadata backend (fs or block) and
passes the necessary arguments for the constructor using the
set_storage_domain call.

The broker associates the storage domain with the agent's
connection and uses it automatically to process all requests
by this agent connection. This allows us to have multiple agents
with separate storage domains in the future.

If the actual path to the device/file is still needed then
the broker will return it from a new get_service_path call.

The default configuration for now is fs based backend (NFS,
iSCSI or Gluster).

The api of client.get_*_direct methods changes to use the
uuid and storage domain type so the path computation is done
only on single place.

Change-Id: I0ad1567c53af561e918be69b8f1b33f75b77fc05
Signed-off-by: Martin Sivak <msi...@redhat.com>
---
M ovirt_hosted_engine_ha/agent/constants.py.in
M ovirt_hosted_engine_ha/agent/hosted_engine.py
M ovirt_hosted_engine_ha/broker/listener.py
M ovirt_hosted_engine_ha/broker/storage_broker.py
M ovirt_hosted_engine_ha/client/client.py
M ovirt_hosted_engine_ha/lib/brokerlink.py
6 files changed, 160 insertions(+), 56 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-hosted-engine-ha 
refs/changes/00/25800/1

diff --git a/ovirt_hosted_engine_ha/agent/constants.py.in 
b/ovirt_hosted_engine_ha/agent/constants.py.in
index 6e8166c..eb06f89 100644
--- a/ovirt_hosted_engine_ha/agent/constants.py.in
+++ b/ovirt_hosted_engine_ha/agent/constants.py.in
@@ -41,6 +41,8 @@
 
 METADATA_BLOCK_BYTES = 512
 SERVICE_TYPE = 'hosted-engine'
+MD_EXTENSION = '.metadata'
+LOCKSPACE_EXTENSION = '.lockspace'
 BROKER_CONNECTION_RETRIES = 10
 HOST_ALIVE_TIMEOUT_SECS = 60
 ENGINE_RETRY_EXPIRATION_SECS = 600
diff --git a/ovirt_hosted_engine_ha/agent/hosted_engine.py 
b/ovirt_hosted_engine_ha/agent/hosted_engine.py
index 57ae407..8eafb0f 100644
--- a/ovirt_hosted_engine_ha/agent/hosted_engine.py
+++ b/ovirt_hosted_engine_ha/agent/hosted_engine.py
@@ -153,8 +153,6 @@
         })
 
         self._sd_path = None
-        self._metadata_path = None
-
         self._sanlock_initialized = False
 
     @property
@@ -295,8 +293,12 @@
         error_count = 0
 
         # make sure everything is initialized
-        self._initialize_broker()
+        # VDSM has to be initialized first, because it prepares the
+        # storage domain connection
+        # Broker then initializes the pieces needed for metadata and leases
+        # which are then used by sanlock
         self._initialize_vdsm()
+        self._initialize_broker()
         self._initialize_sanlock()
         self._initialize_domain_monitor()
 
@@ -312,8 +314,8 @@
 
             try:
                 # make sure everything is still initialized
-                self._initialize_broker()
                 self._initialize_vdsm()
+                self._initialize_broker()
                 self._initialize_sanlock()
                 self._initialize_domain_monitor()
 
@@ -383,6 +385,14 @@
                 raise
             else:
                 self._local_monitors[m['field']] = lm
+
+        # register storage domain info
+        sd_uuid = self._config.get(config.ENGINE, config.SD_UUID)
+        dom_type = self._config.get(config.ENGINE, config.DOMAIN_TYPE)
+        self._broker.set_storage_domain("fs",
+                                        sd_uuid=sd_uuid,
+                                        dom_type=dom_type)
+
         self._log.info("Broker initialized, all submonitors started")
 
     def _initialize_vdsm(self):
@@ -440,10 +450,8 @@
 
     def _initialize_sanlock(self):
         self._cond_start_service('sanlock')
-
-        self._metadata_dir = env_path.get_metadata_path(self._config)
-        lease_file = os.path.join(self._metadata_dir,
-                                  constants.SERVICE_TYPE + '.lockspace')
+        lease_file = self._broker.get_service_file(
+            constants.SERVICE_TYPE + constants.LOCKSPACE_EXTENSION)
         if not self._sanlock_initialized:
             lvl = logging.INFO
         else:
@@ -612,15 +620,13 @@
 
     def _push_to_storage(self, blocks):
         self._broker.put_stats_on_storage(
-            self._metadata_dir,
-            constants.SERVICE_TYPE,
+            constants.SERVICE_TYPE + constants.MD_EXTENSION,
             self._config.get(config.ENGINE, config.HOST_ID),
             blocks)
 
     def collect_stats(self):
         all_stats = self._broker.get_stats_from_storage(
-            self._metadata_dir,
-            constants.SERVICE_TYPE)
+            constants.SERVICE_TYPE + constants.MD_EXTENSION)
 
         data = {
             # Flag is set if the local agent discovers metadata too new for it
@@ -644,8 +650,7 @@
         }
 
         all_stats = self._broker.get_stats_from_storage(
-            self._metadata_dir,
-            constants.SERVICE_TYPE)
+            constants.SERVICE_TYPE + constants.MD_EXTENSION)
 
         # host_id 0 is a special case, representing global metadata
         if all_stats and 0 in all_stats:
diff --git a/ovirt_hosted_engine_ha/broker/listener.py 
b/ovirt_hosted_engine_ha/broker/listener.py
index 9b6a0d3..ceb2eda 100644
--- a/ovirt_hosted_engine_ha/broker/listener.py
+++ b/ovirt_hosted_engine_ha/broker/listener.py
@@ -162,7 +162,8 @@
                 data = util.socket_readline(self.request, self._log)
                 self._log.debug("Input: %s", data)
                 try:
-                    response = "success " + self._dispatch(data)
+                    response = ("success "
+                                + self._dispatch(self.client_address, data))
                 except RequestError as e:
                     response = "failure " + format(str(e))
                 self._log.debug("Response: %s", response)
@@ -210,6 +211,11 @@
                                 + "%d - %s", id, str(e))
         self._remove_monitor_conn_entry()
 
+        # cleanup storage
+        with self.server.sp_listener.storage_broker_instance_access_lock:
+            self.server.sp_listener.storage_broker_instance \
+                .cleanup(self.client_address)
+
         try:
             SocketServer.BaseRequestHandler.finish(self)
         except socket.error as e:
@@ -217,7 +223,7 @@
                 self._log.error("Error while closing connection",
                                 exc_info=True)
 
-    def _dispatch(self, data):
+    def _dispatch(self, client, data):
         """
         Parses and dispatches a request to the appropriate subsystem.
 
@@ -258,13 +264,26 @@
             options = self._get_options(tokens)
             with self.server.sp_listener.storage_broker_instance_access_lock:
                 stats = self.server.sp_listener.storage_broker_instance \
-                    .get_all_stats_for_service_type(**options)
+                    .get_all_stats_for_service_type(client, **options)
             return stats
         elif type == 'put-stats':
             options = self._get_options(tokens)
             with self.server.sp_listener.storage_broker_instance_access_lock:
                 self.server.sp_listener.storage_broker_instance \
-                    .put_stats(**options)
+                    .put_stats(client, **options)
+            return "ok"
+        elif type == 'service-path':
+            options = self._get_options(tokens)
+            with self.server.sp_listener.storage_broker_instance_access_lock:
+                self.server.sp_listener.storage_broker_instance \
+                    .get_service_path(client, **options)
+            return "ok"
+        elif type == 'set-storage-domain':
+            sd_type = int(tokens.pop(0))
+            options = self._get_options(tokens)
+            with self.server.sp_listener.storage_broker_instance_access_lock:
+                self.server.sp_listener.storage_broker_instance \
+                    .set_storage_domain(client, sd_type, **options)
             return "ok"
         elif type == 'notify':
             options = self._get_options(tokens)
diff --git a/ovirt_hosted_engine_ha/broker/storage_broker.py 
b/ovirt_hosted_engine_ha/broker/storage_broker.py
index ade92f0..a33ce1d 100644
--- a/ovirt_hosted_engine_ha/broker/storage_broker.py
+++ b/ovirt_hosted_engine_ha/broker/storage_broker.py
@@ -25,21 +25,45 @@
 
 from ..env import constants
 from ..lib.exceptions import RequestError
-from ..lib import util
+from ..lib.storage_backends import FilesystemBackend, BlockBackend
 
 
 class StorageBroker(object):
+
+    DOMAINTYPES = {
+        "fs": FilesystemBackend,
+        "block": BlockBackend
+    }
+
     def __init__(self):
         self._log = logging.getLogger("%s.StorageBroker" % __name__)
         self._storage_access_lock = threading.Lock()
+        self._backends = {}
 
-    def get_all_stats_for_service_type(self, storage_dir, service_type):
+    def set_storage_domain(self, client, sd_type, **kwargs):
+        """
+        The first thing any new broker client should do is to configure
+        the storage it wants to use. Client is arbitrary hashable structure,
+        but usually is (host, ip) of the agent that opened the connection
+        to the broker. The client value is provided by the broker logic.
+
+        :param sd_type: The type of backend the clients want to use
+        :type sd_type: Currently the only supported values are "fs" and "block"
+        """
+        if client in self._backends:
+            self._backends[client].disconnect()
+            del self._backends[client]
+        self._backends[client] = self.DOMAINTYPES[sd_type](**kwargs)
+        self._backends[client].connect()
+
+    def get_all_stats_for_service_type(self, client, service_type):
         """
         Reads all files in storage_dir for the given service_type, returning a
         space-delimited string of "<host_id>=<hex data>" for each host.
         """
-        d = self.get_raw_stats_for_service_type(storage_dir, service_type)
+        d = self.get_raw_stats_for_service_type(client, service_type)
         str_list = []
+
         for host_id in sorted(d.keys()):
             hex_data = base64.b16encode(d.get(host_id))
             self._log.debug("Read for host id %d: %s",
@@ -47,7 +71,7 @@
             str_list.append("{0}={1}".format(host_id, hex_data))
         return ' '.join(str_list)
 
-    def get_raw_stats_for_service_type(self, storage_dir, service_type):
+    def get_raw_stats_for_service_type(self, client, service_type):
         """
         Reads all files in storage_dir for the given service_type, returning a
         dict of "host_id: data" for each host
@@ -55,9 +79,14 @@
         Note: this method is called from the client as well as from
         self.get_all_stats_for_service_type().
         """
-        self._log.debug("Getting stats for service %s from %s",
-                        service_type, storage_dir)
-        path = os.path.join(storage_dir, self._get_filename(service_type))
+        try:
+            path, offset = self._backends[client].filename(service_type)
+            self._log.debug("Getting stats for service %s from %s with"
+                            " offset %d",
+                            service_type, path, offset)
+        except KeyError:
+            self._log.error("No storage configured for %s", client)
+            return {}
 
         # Use direct I/O if possible, to avoid the local filesystem cache
         # from hiding metadata file updates from other hosts.  For NFS, we
@@ -72,6 +101,7 @@
         try:
             with self._storage_access_lock:
                 f = os.open(path, direct_flag | os.O_RDONLY)
+                os.lseek(f, offset, os.SEEK_SET)
                 data = os.read(f, read_size)
                 os.close(f)
         except IOError as e:
@@ -83,7 +113,7 @@
                      for i in range(0, len(data), bs)
                      if data[i] != '\0'))
 
-    def put_stats(self, storage_dir, service_type, host_id, data):
+    def put_stats(self, client, service_type, host_id, data):
         """
         Writes to the storage in file <storage_dir>/<service-type>.metadata,
         storing the hex string data (e.g. 01bc4f[...]) in binary format.
@@ -96,18 +126,21 @@
         the file after the write.
         """
         host_id = int(host_id)
-        path = os.path.join(storage_dir, self._get_filename(service_type))
-        offset = host_id * constants.HOST_SEGMENT_BYTES
-        self._log.debug("Writing stats for service %s, host id %d"
-                        " to file %s, offset %d",
-                        service_type, host_id, path, offset)
+        try:
+            path, offset = self._backends[client].filename(service_type)
+            offset += host_id * constants.HOST_SEGMENT_BYTES
+            self._log.debug("Writing stats for service %s, host id %d"
+                            " to file %s, offset %d",
+                            service_type, host_id, path, offset)
+        except KeyError:
+            self._log.error("No storage configured for %s", client)
+            return None
 
         byte_data = base64.b16decode(data)
         byte_data = byte_data.ljust(constants.HOST_SEGMENT_BYTES, '\0')
         with self._storage_access_lock:
             f = None
             try:
-                util.mkdir_recursive(storage_dir)
                 f = io.open(path, "r+b")
                 f.seek(offset, os.SEEK_SET)
                 f.write(byte_data)
@@ -122,7 +155,22 @@
 
         self._log.debug("Finished")
 
-    def _get_filename(self, service_type):
-        # Nothing special yet
-        # FIXME should escape special chars before production deployment
-        return "{0}.{1}".format(service_type, constants.MD_EXTENSION)
+    def get_service_path(self, client, service):
+        """
+        Returns the full path to a file or device that holds the data
+        for specified service.
+
+        Client ID is provided by the broker logic.
+        """
+        return self._backends[client].filename(service)[0]
+
+    def cleanup(self, client):
+        """
+        After client (like ha_agent) disconnects the storage backend
+        needs to be freed properly.
+
+        Client ID is provided by the broker logic.
+        """
+        if client in self._backends:
+            self._backends[client].disconnect()
+            del self._backends[client]
diff --git a/ovirt_hosted_engine_ha/client/client.py 
b/ovirt_hosted_engine_ha/client/client.py
index 508ff0a..40f7957 100644
--- a/ovirt_hosted_engine_ha/client/client.py
+++ b/ovirt_hosted_engine_ha/client/client.py
@@ -18,7 +18,6 @@
 #
 
 import logging
-import os
 import time
 
 from ..agent import constants as agent_constants
@@ -81,13 +80,12 @@
             self._config = config.Config()
         broker = brokerlink.BrokerLink()
         with broker.connection():
-            stats = broker.get_stats_from_storage(
-                path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE)
+            self._configure_broker_conn(broker)
+            stats = 
broker.get_stats_from_storage(constants.SERVICE_TYPE+agent_constants.MD_EXTENSION)
 
         return self._parse_stats(stats, mode)
 
-    def get_all_stats_direct(self, dom_path, service_type, mode=StatModes.ALL):
+    def get_all_stats_direct(self, sd_uuid, dom_type, service_type, 
mode=StatModes.ALL):
         """
         Like get_all_stats(), but bypasses broker by directly accessing
         storage.
@@ -95,8 +93,9 @@
         from ..broker import storage_broker
 
         sb = storage_broker.StorageBroker()
-        path = os.path.join(dom_path, constants.SD_METADATA_DIR)
-        stats = sb.get_raw_stats_for_service_type(path, service_type)
+        sb.set_storage_domain("client", "fs", sd_uuid=sd_uuid, 
dom_type=dom_type)
+        path = sb.get_service_path("client", 
constants.SERVICE_TYPE+agent_constants.MD_EXTENSION)
+        stats = sb.get_raw_stats_for_service_type("client", service_type)
 
         return self._parse_stats(stats, mode)
 
@@ -146,6 +145,15 @@
             service_type,
             self.StatModes.HOST)
 
+    def _configure_broker_conn(self, broker):
+        if self._config is None:
+            self._config = config.Config()
+        sd_uuid = self._config.get(config.ENGINE, config.SD_UUID)
+        dom_type = self._config.get(config.ENGINE, config.DOMAIN_TYPE)
+        broker.set_storage_domain("fs",
+                                  sd_uuid=sd_uuid,
+                                  dom_type=dom_type)
+
     def set_global_md_flag(self, flag, value):
         """
         Connects to HA broker and sets flags in global metadata, leaving
@@ -164,14 +172,10 @@
         else:
             put_val = value
 
-        if self._config is None:
-            self._config = config.Config()
-
         broker = brokerlink.BrokerLink()
         with broker.connection():
-            all_stats = broker.get_stats_from_storage(
-                path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE)
+            self._configure_broker_conn(broker)
+            all_stats = 
broker.get_stats_from_storage(constants.SERVICE_TYPE+agent_constants.MD_EXTENSION)
 
             global_stats = all_stats.get(0)
             if global_stats and len(global_stats):
@@ -184,7 +188,7 @@
             block = metadata.create_global_metadata_from_dict(md_dict)
             broker.put_stats_on_storage(
                 path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE,
+                constants.SERVICE_TYPE+agent_constants.MD_EXTENSION,
                 0,
                 block)
 
@@ -200,9 +204,8 @@
         host_id = int(self._config.get(config.ENGINE, config.HOST_ID))
         broker = brokerlink.BrokerLink()
         with broker.connection():
-            stats = broker.get_stats_from_storage(
-                path.get_metadata_path(self._config),
-                constants.SERVICE_TYPE)
+            self._configure_broker_conn(broker)
+            stats = 
broker.get_stats_from_storage(constants.SERVICE_TYPE+agent_constants.MD_EXTENSION)
 
         score = 0
         if host_id in stats:
diff --git a/ovirt_hosted_engine_ha/lib/brokerlink.py 
b/ovirt_hosted_engine_ha/lib/brokerlink.py
index bb636ea..e78af79 100644
--- a/ovirt_hosted_engine_ha/lib/brokerlink.py
+++ b/ovirt_hosted_engine_ha/lib/brokerlink.py
@@ -148,6 +148,33 @@
         self._log.debug("Success, status %s", response)
         return response
 
+    def get_service_path(self, service):
+        request = "service-path {0}".format(service)
+        try:
+            response = self._checked_communicate(request)
+        except Exception as e:
+            self._log.error("Exception getting service path: %s", str(e))
+            raise RequestError("Failed to get service path: {0}"
+                               .format(str(e)))
+        self._log.debug("Success, service path %s", response)
+        return response
+
+    def set_storage_domain(self, sd_type, **options):
+        request = ["set-storage-domain {0}".format(sd_type)]
+        for (k, v) in options.iteritems():
+            request.append("{0}={1}".format(k, str(v)))
+        request = " ".join(request)
+
+        try:
+            response = self._checked_communicate(request)
+        except Exception as e:
+            raise RequestError("Failed to set storage domain {0}, "
+                               "options {1}: {2}"
+                               .format(sd_type, options, e))
+
+        self._log.info("Success, id %s", response)
+        return response
+
     def put_stats_on_storage(self, storage_dir, service_type, host_id, data):
         """
         Puts data on the shared storage according to the parameters.
@@ -161,13 +188,13 @@
                    .format(storage_dir, service_type, host_id, hex_data))
         self._checked_communicate(request)
 
-    def get_stats_from_storage(self, storage_dir, service_type):
+    def get_stats_from_storage(self, service_type):
         """
         Returns data from the shared storage for all hosts of the specified
         service type.
         """
-        request = ("get-stats storage_dir={0} service_type={1}"
-                   .format(storage_dir, service_type))
+        request = ("get-stats service_type={0}"
+                   .format(service_type))
         result = self._checked_communicate(request)
 
         tokens = result.split()


-- 
To view, visit http://gerrit.ovirt.org/25800
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0ad1567c53af561e918be69b8f1b33f75b77fc05
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-hosted-engine-ha
Gerrit-Branch: master
Gerrit-Owner: Martin Sivák <msi...@redhat.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to