Martin Peřina has uploaded a new change for review.

Change subject: core: Introduce standalone fence_kdump listener
......................................................................

core: Introduce standalone fence_kdump listener

Introduces core parts of standalone fence_kdump listener.

Change-Id: Ieec3bad47bbba860a52a9ff4e2eb7f61277f4e36
Bug-Url: https://bugzilla.redhat.com/1079821
Signed-off-by: Martin Perina <mper...@redhat.com>
---
M Makefile
A packaging/services/ovirt-fence-kdump-listener/__init__.py
A packaging/services/ovirt-fence-kdump-listener/config.py.in
A packaging/services/ovirt-fence-kdump-listener/db.py
A packaging/services/ovirt-fence-kdump-listener/flowterminator.py
A packaging/services/ovirt-fence-kdump-listener/heartbeat.py
A packaging/services/ovirt-fence-kdump-listener/listener.py
A packaging/services/ovirt-fence-kdump-listener/logbase.py
A packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py
A packaging/services/ovirt-fence-kdump-listener/qconsumer.py
10 files changed, 632 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/01/27201/1

diff --git a/Makefile b/Makefile
index d0adbc5..33c4398 100644
--- a/Makefile
+++ b/Makefile
@@ -181,6 +181,7 @@
        packaging/services/ovirt-websocket-proxy/ovirt-websocket-proxy.conf \
        packaging/services/ovirt-websocket-proxy/ovirt-websocket-proxy.systemd \
        packaging/services/ovirt-websocket-proxy/ovirt-websocket-proxy.sysv \
+       packaging/services/ovirt-fence-kdump-listener/config.py \
        packaging/setup/bin/ovirt-engine-setup.env \
        packaging/setup/ovirt_engine_setup/config.py \
        packaging/sys-etc/logrotate.d/ovirt-engine \
diff --git a/packaging/services/ovirt-fence-kdump-listener/__init__.py 
b/packaging/services/ovirt-fence-kdump-listener/__init__.py
new file mode 100644
index 0000000..aa5eb8e
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/__init__.py
@@ -0,0 +1,19 @@
+# Copyright 2012 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+__all__ = []
+
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/packaging/services/ovirt-fence-kdump-listener/config.py.in 
b/packaging/services/ovirt-fence-kdump-listener/config.py.in
new file mode 100644
index 0000000..c8e07f2
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/config.py.in
@@ -0,0 +1,25 @@
+# Copyright 2012 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+DEV_PYTHON_DIR = '@DEV_PYTHON_DIR@'
+ENGINE_VARS = '@ENGINE_VARS@'
+
+
+import sys
+if DEV_PYTHON_DIR:
+    sys.path.append(DEV_PYTHON_DIR)
+
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/packaging/services/ovirt-fence-kdump-listener/db.py 
b/packaging/services/ovirt-fence-kdump-listener/db.py
new file mode 100644
index 0000000..297f36e
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/db.py
@@ -0,0 +1,119 @@
+import psycopg2
+
+import logbase
+
+
+@logbase.Logable()
+class Manager(object):
+    def __init__(
+            self,
+            host,
+            port,
+            database,
+            username,
+            password,
+            secured,
+            secure_validation,
+    ):
+        self._db_dsn = (
+            "host=%s port=%s dbname=%s user=%s password=%s sslmode=%s" % (
+                host,
+                port,
+                database,
+                username,
+                password,
+                Manager._get_ssl_mode(secured, secure_validation),
+            )
+        )
+        self._logger.debug("Database dsn: '%s'", self._db_dsn)
+
+    @staticmethod
+    def _get_ssl_mode(secured, secure_validation):
+        sslmode = "disable"
+        if secured == "True":
+            if secure_validation == "True":
+                sslmode = "verify-full"
+            else:
+                sslmode = "require"
+        return sslmode
+
+    @staticmethod
+    def params2message(host, received, status):
+        """From specified arguments creates a message with format
+            {
+               "host": host,
+               "received": received,
+               "status": status
+            }
+        """
+        return {
+            "host": host,
+            "received": received,
+            "status": status,
+        }
+
+    @staticmethod
+    def record2message(record):
+        """From specified db record creates a message with format
+            {
+               "host": record[0],
+               "received": record[1],
+               "status": record[2]
+            }
+        """
+        return Manager.params2message(
+            record[0],
+            record[1],
+            record[2],
+        ) if record is not None else None
+
+    def _load_config_value(self, connection, name, version="general"):
+        cur = connection.cursor()
+        cur.execute(
+            "SELECT option_value FROM vdc_options"
+            "  WHERE option_name=%(name)s"
+            "  AND version=%(version)s",
+            name,
+            version,
+        )
+        records = cur.fetchmany()
+        return records[0] if records else None
+
+    def init_config(self):
+        self._config = {}
+
+        conn = self.open_connection()
+        self._config["NextKdumpTimeout"] = self._load_config_value(
+            conn,
+            "NextKdumpTimeout",
+        )
+        self._config["KdumpFinishedTimeout"] = self._load_config_value(
+            conn,
+            "KdumpFinishedTimeout",
+        )
+        self._config["FenceKdumpListenerHost"] = self._load_config_value(
+            conn,
+            "FenceKdumpListenerHost",
+            )
+        self._config["FenceKdumpListenerPort"] = self._load_config_value(
+            conn,
+            "FenceKdumpListenerPort",
+        )
+
+    def get_config_value(self, name):
+        return self._config["name"]
+
+    def open_connection(self):
+        return psycopg2.connect(self._db_dsn)
+
+    def close_connection(self, connection):
+        """Tries to close specified db connection. If error appeared,
+           it's logged"""
+        try:
+            if connection is not None:
+                connection.close()
+        except (psycopg2.Warning, psycopg2.Error) as e:
+            self._logger.error(
+                "Error closing connection: %s",
+                e,
+            )
diff --git a/packaging/services/ovirt-fence-kdump-listener/flowterminator.py 
b/packaging/services/ovirt-fence-kdump-listener/flowterminator.py
new file mode 100644
index 0000000..b0187a2
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/flowterminator.py
@@ -0,0 +1,84 @@
+import datetime
+import dateutil.tz
+import psycopg2
+import threading
+import time
+
+import db
+import logbase
+
+
+@logbase.Logable()
+class KdumpFlowTerminator(threading.Thread):
+    """Creates record with status "finished" for finished kdump flows"""
+
+    def __init__(self, queue, db_manager, kdump_finished_timeout):
+        super(KdumpFlowTerminator, self).__init__()
+        self.daemon = True
+        self._queue = queue
+        self._db_manager = db_manager
+        self.kdump_finished_timeout = datetime.timedelta(
+            seconds=self._db_manager.get_config_value(
+                "KdumpFinishedTimeout"
+            )
+        )
+
+    @staticmethod
+    def _get_unfinished_flows(connection):
+        """Returns list of db records as messages for hosts which
+           kdump flow hasn't been finished yet"""
+        cur = connection.cursor()
+        cur.execute(
+            "SELECT fkm1.host, fkm1.received, fkm1.status"
+            "  FROM fence_kdump_messages fkm1"
+            "  JOIN ("
+            "    SELECT host, MAX(received) AS latest"
+            "    FROM fence_kdump_messages"
+            "    GROUP BY host"
+            "  ) fkm2"
+            "  ON fkm1.host = fkm2.host"
+            "    AND fkm1.received = fkm2.latest"
+            "  WHERE fkm1.status <> 'finished'"
+            "    AND fkm1.host <> 'fence_kdump_listener'"
+        )
+
+        result = []
+        for record in cur.fetchall():
+            result.append(db.Manager.record2message(record))
+        cur.close()
+        return result
+
+    def run(self):
+        conn = None
+        while True:
+            try:
+                conn = self._db_manager.open_connection()
+
+                finished = datetime.datetime.now(dateutil.tz.tzlocal())
+                records = KdumpFlowTerminator._get_unfinished_flows(conn)
+                for record in records:
+                    if (
+                        record["received"] + self.kdump_finished_timeout
+                        < finished
+                    ):
+                        msg = db.Manager.params2message(
+                            record["host"],
+                            finished,
+                            "finished",
+                        )
+                        self._logger.debug(
+                            "Message '%s' sent to queue.",
+                            msg,
+                        )
+                        self._queue.put(msg)
+
+            except (psycopg2.Warning, psycopg2.Error) as e:
+                self._logger.error(
+                    "Error on finishing flows: %s",
+                    e,
+                )
+            finally:
+                self._db_manager.close_connection(conn)
+                conn = None
+
+            time.sleep(30)
diff --git a/packaging/services/ovirt-fence-kdump-listener/heartbeat.py 
b/packaging/services/ovirt-fence-kdump-listener/heartbeat.py
new file mode 100644
index 0000000..ea92510
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/heartbeat.py
@@ -0,0 +1,57 @@
+import datetime
+import dateutil.tz
+import psycopg2
+import threading
+import time
+
+import db
+import logbase
+
+
+@logbase.Logable()
+class Heartbeat(threading.Thread):
+    """Periodically updates received to current time for record
+       with host 'fence-kdump-listener'"""
+
+    def __init__(self, db_manager):
+        super(Heartbeat, self).__init__()
+        self.daemon = True
+        self._db_manager = db_manager
+
+    @staticmethod
+    def _update_heartbeat(connection, timestamp):
+        cur = connection.cursor()
+        cur.execute(
+            "UPDATE fence_kdump_messages"
+            "  SET received = %s"
+            "  WHERE host = 'fence_kdump_listener'",
+            timestamp,
+        )
+        connection.commit()
+        cur.close()
+
+    def run(self):
+        conn = None
+        while True:
+            try:
+                conn = self._db_manager.open_connection()
+
+                Heartbeat._update_heartbeat(
+                    conn,
+                    datetime.datetime.now(dateutil.tz.tzlocal()),
+                )
+
+            except (psycopg2.Warning, psycopg2.Error) as e:
+                self._logger.error(
+                    "Error updating heartbeat: %s",
+                    e,
+                )
+            finally:
+                self._db_manager.close_connection(conn)
+                conn = None
+
+            time.sleep(
+                self._db_manager.get_config_value(
+                    "FenceKdumpListenerAliveTimeout"
+                )
+            )
diff --git a/packaging/services/ovirt-fence-kdump-listener/listener.py 
b/packaging/services/ovirt-fence-kdump-listener/listener.py
new file mode 100644
index 0000000..0c47a1a
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/listener.py
@@ -0,0 +1,66 @@
+import binascii
+import datetime
+import dateutil.tz
+import SocketServer
+import struct
+
+import db
+import logbase
+
+
+@logbase.Logable()
+class FenceKdumpRequestHandler(SocketServer.DatagramRequestHandler):
+    """Magic number fence_kdump message for version 1"""
+    _FK_MSG_MAGIC_V1 = 456141376
+
+    def _parse_number(self, data, offset=0):
+        number = None
+        if data is not None:
+            number = struct.unpack_from("<I", data, offset)[0]
+        return number
+
+    def handle(self):
+        data = self.request[0].strip()
+        received = datetime.datetime.now(dateutil.tz.tzlocal())
+        self._logger.debug(
+            (
+                "Received message '%s' from host='%s' at '%s'."
+            ),
+            binascii.hexlify(data),
+            self.client_address[0],
+            received,
+        )
+
+        magic = self._parse_number(data)
+        version = self._parse_number(data, 4)
+
+        if magic == self._FK_MSG_MAGIC_V1 and version == 1:
+            msg = db.Manager.params2message(
+                self.client_address[0],
+                received,
+                None,
+            )
+            self._logger.debug(
+                "Message '%s' sent to queue.",
+                msg,
+            )
+            self.server.queue.put(msg)
+
+
+@logbase.Logable()
+class FenceKdumpListener(object):
+    def __init__(
+            self,
+            host,
+            port,
+            queue,
+    ):
+        self._queue = queue
+        self._server = SocketServer.ThreadingUDPServer(
+            (host, port),
+            FenceKdumpRequestHandler,
+        )
+        self._server.queue = self._queue
+
+    def start(self):
+        self._server.serve_forever()
diff --git a/packaging/services/ovirt-fence-kdump-listener/logbase.py 
b/packaging/services/ovirt-fence-kdump-listener/logbase.py
new file mode 100644
index 0000000..76ad459
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/logbase.py
@@ -0,0 +1,20 @@
+import logging
+
+
+_LOG_NAME = 'ovirt-fence-kdump-listener'
+
+
+class Logable(object):
+    def __call__(self, target):
+        _logger = logging.getLogger(".".join((_LOG_NAME, target.__name__)))
+        setattr(target, "_logger", _logger)
+        return target
+
+
+def setup_logger(filename, verbose=False):
+    """Configures logging to specified filename"""
+    logger = logging.getLogger(_LOG_NAME)
+    logger.setLevel(logging.DEBUG if verbose else logging.INFO)
+    ch = logging.FileHandler(filename)
+    ch.setLevel(logging.DEBUG)
+    logger.addHandler(ch)
diff --git 
a/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py 
b/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py
new file mode 100755
index 0000000..e443443
--- /dev/null
+++ 
b/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py
@@ -0,0 +1,147 @@
+#!/usr/bin/python
+
+# Copyright 2013 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import gettext
+import os
+import Queue
+import sys
+_ = lambda m: gettext.dgettext(message=m, domain="ovirt-engine")
+
+
+import config
+import db
+import flowterminator
+import heartbeat
+import listener
+import logbase
+import qconsumer
+
+
+from ovirt_engine import configfile
+from ovirt_engine import service
+
+
+class Daemon(service.Daemon):
+
+    def __init__(self):
+        super(Daemon, self).__init__()
+        self._defaults = os.path.abspath(
+            os.path.join(
+                os.path.dirname(sys.argv[0]),
+                "ovirt-fence-kdump-listener.conf",
+            )
+        )
+
+    def _checkInstallation(
+        self,
+        pidfile,
+    ):
+        # Check the required engine directories and files:
+        self.check(
+            os.path.join(
+                self._config.get("ENGINE_USR"),
+                "services",
+            ),
+            directory=True,
+        )
+
+        if pidfile is not None:
+            self.check(
+                name=pidfile,
+                writable=True,
+                mustExist=False,
+            )
+
+    def daemonSetup(self):
+
+        if not os.path.exists(self._defaults):
+            raise RuntimeError(
+                _(
+                    "The configuration defaults file "{file}" "
+                    "required but missing"
+                ).format(
+                    file=self._defaults,
+                )
+            )
+
+        self._config = configfile.ConfigFile(
+            (
+                self._defaults,
+                config.ENGINE_VARS,
+            ),
+        )
+
+        self._checkInstallation(
+            pidfile=self.pidfile,
+        )
+
+    def daemonStdHandles(self):
+        consoleLog = open(os.devnull, "w+")
+        return (consoleLog, consoleLog)
+
+    def daemonContext(self):
+        logbase.setup_logger(
+            os.path.join(
+                self._config.get("ENGINE_LOG"),
+                "ovirt-fence-kdump-lsnr.log",
+            ),
+            os.environ.get('OVIRT_SERVICE_DEBUG', '0') == '0',
+        )
+
+        queue = Queue.Queue()
+
+        db_manager = db.Manager(
+            host=self._config.get("ENGINE_DB_HOST"),
+            port=self._config.get("ENGINE_DB_PORT"),
+            database=self._config.get("ENGINE_DB_DATABASE"),
+            username=self._config.get("ENGINE_DB_USER"),
+            password=self._config.get("ENGINE_DB_PASSWORD"),
+            secured=self._config.get("ENGINE_DB_SECURED"),
+            secure_validation=self._config.get("ENGINE_DB_SECURED_VALIDATION"),
+        )
+        db_manager.init_config()
+
+        cnsmr = qconsumer.QueueConsumer(
+            queue=queue,
+            db_manager=db_manager,
+        )
+        cnsmr.start()
+
+        ft = flowterminator.KdumpFlowTerminator(
+            queue=queue,
+            db_manager=db_manager,
+        )
+        ft.start()
+
+        hb = heartbeat.Heartbeat(
+            db_manager=db_manager
+        )
+        hb.start()
+
+        lsnr = listener.FenceKdumpListener(
+            db_manager.get_config_value("FenceKdumpListenerHost"),
+            db_manager.get_config_value("FenceKdumpListenerPort"),
+            queue=queue
+        )
+        lsnr.start()
+
+if __name__ == "__main__":
+    service.setupLogger()
+    d = Daemon()
+    d.run()
+
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/packaging/services/ovirt-fence-kdump-listener/qconsumer.py 
b/packaging/services/ovirt-fence-kdump-listener/qconsumer.py
new file mode 100644
index 0000000..389a249
--- /dev/null
+++ b/packaging/services/ovirt-fence-kdump-listener/qconsumer.py
@@ -0,0 +1,94 @@
+import datetime
+import psycopg2
+import threading
+import time
+
+import db
+import logbase
+
+
+@logbase.Logable()
+class QueueConsumer(threading.Thread):
+    """Receives messages from queue and stores them into database"""
+
+    def __init__(self, queue, db_manager):
+        super(QueueConsumer, self).__init__()
+        self.daemon = True
+        self._queue = queue
+        self._db_manager = db_manager
+        self._next_kdump_timeout = datetime.timedelta(
+            seconds=self._db_manager.get_config_value(
+                "NextKdumpTimeout"
+            )
+        )
+        self._db_conn = None
+
+    def _create_message(self, conn, message):
+        cur = conn.cursor()
+        cur.execute(
+            "INSERT INTO fence_kdump_messages (host, received, status)"
+            " VALUES (%(host)s, %(received)s, %(status)s)",
+            message,
+        )
+        conn.commit()
+        cur.close()
+        self._logger.debug("Message '%s' wrote to database.", message)
+
+    def _get_latest(self, conn, host):
+        cur = conn.cursor()
+        cur.execute(
+            "SELECT host, received, status FROM fence_kdump_messages"
+            " WHERE host = %s"
+            " ORDER BY received DESC"
+            " LIMIT 1",
+            host,
+        )
+        records = cur.fetchmany()
+        result = db.Manager.record2message(records[0]) if records else None
+        cur.close()
+        self._logger.debug(
+            "Most recent record for host '%s' is '%s'",
+            host,
+            result,
+        )
+        return result
+
+    def run(self):
+        while True:
+            if self._queue.empty():
+                time.sleep(1)
+                continue
+
+            try:
+                conn = self._db_manager.open_connection()
+                message = self._queue.get()
+                if message["status"] is None:
+                    latest = self._get_latest(conn, message["host"])
+                    if (
+                        latest is None
+                        or (
+                            latest["received"] + self._next_kdump_timeout
+                            < message["received"]
+                        )
+                    ):
+                        message["status"] = "started"
+                    else:
+                        message["status"] = "dumping"
+                self._create_message(conn, message)
+
+                if message["status"] != "dumping":
+                    self._logger.info(
+                        "Host '%s' %s kdump flow.",
+                        message["host"],
+                        message["status"],
+                        )
+
+            except (psycopg2.Warning, psycopg2.Error) as e:
+                self._logger.error(
+                    "Error writing message '%s' to database: %s",
+                    message,
+                    e,
+                )
+            finally:
+                if self._queue.empty:
+                    self._db_manager.close_connection(conn)


-- 
To view, visit http://gerrit.ovirt.org/27201
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ieec3bad47bbba860a52a9ff4e2eb7f61277f4e36
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Martin Peřina <mper...@redhat.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to