Martin Peřina has uploaded a new change for review. Change subject: core: Introduce standalone fence_kdump listener ......................................................................
core: Introduce standalone fence_kdump listener Introduces core parts of standalone fence_kdump listener. Change-Id: Ieec3bad47bbba860a52a9ff4e2eb7f61277f4e36 Bug-Url: https://bugzilla.redhat.com/1079821 Signed-off-by: Martin Perina <mper...@redhat.com> --- M Makefile A packaging/services/ovirt-fence-kdump-listener/__init__.py A packaging/services/ovirt-fence-kdump-listener/config.py.in A packaging/services/ovirt-fence-kdump-listener/db.py A packaging/services/ovirt-fence-kdump-listener/flowterminator.py A packaging/services/ovirt-fence-kdump-listener/heartbeat.py A packaging/services/ovirt-fence-kdump-listener/listener.py A packaging/services/ovirt-fence-kdump-listener/logbase.py A packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py A packaging/services/ovirt-fence-kdump-listener/qconsumer.py 10 files changed, 632 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/01/27201/1 diff --git a/Makefile b/Makefile index d0adbc5..33c4398 100644 --- a/Makefile +++ b/Makefile @@ -181,6 +181,7 @@ packaging/services/ovirt-websocket-proxy/ovirt-websocket-proxy.conf \ packaging/services/ovirt-websocket-proxy/ovirt-websocket-proxy.systemd \ packaging/services/ovirt-websocket-proxy/ovirt-websocket-proxy.sysv \ + packaging/services/ovirt-fence-kdump-listener/config.py \ packaging/setup/bin/ovirt-engine-setup.env \ packaging/setup/ovirt_engine_setup/config.py \ packaging/sys-etc/logrotate.d/ovirt-engine \ diff --git a/packaging/services/ovirt-fence-kdump-listener/__init__.py b/packaging/services/ovirt-fence-kdump-listener/__init__.py new file mode 100644 index 0000000..aa5eb8e --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2012 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +__all__ = [] + + +# vim: expandtab tabstop=4 shiftwidth=4 diff --git a/packaging/services/ovirt-fence-kdump-listener/config.py.in b/packaging/services/ovirt-fence-kdump-listener/config.py.in new file mode 100644 index 0000000..c8e07f2 --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/config.py.in @@ -0,0 +1,25 @@ +# Copyright 2012 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +DEV_PYTHON_DIR = '@DEV_PYTHON_DIR@' +ENGINE_VARS = '@ENGINE_VARS@' + + +import sys +if DEV_PYTHON_DIR: + sys.path.append(DEV_PYTHON_DIR) + + +# vim: expandtab tabstop=4 shiftwidth=4 diff --git a/packaging/services/ovirt-fence-kdump-listener/db.py b/packaging/services/ovirt-fence-kdump-listener/db.py new file mode 100644 index 0000000..297f36e --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/db.py @@ -0,0 +1,119 @@ +import psycopg2 + +import logbase + + +@logbase.Logable() +class Manager(object): + def __init__( + self, + host, + port, + database, + username, + password, + secured, + secure_validation, + ): + self._db_dsn = ( + "host=%s port=%s dbname=%s user=%s password=%s sslmode=%s" % ( + host, + port, + database, + username, + password, + Manager._get_ssl_mode(secured, secure_validation), + ) + ) + self._logger.debug("Database dsn: '%s'", self._db_dsn) + + @staticmethod + def _get_ssl_mode(secured, secure_validation): + sslmode = "disable" + if secured == "True": + if secure_validation == "True": + sslmode = "verify-full" + else: + sslmode = "require" + return sslmode + + @staticmethod + def params2message(host, received, status): + """From specified arguments creates a message with format + { + "host": host, + "received": received, + "status": status + } + """ + return { + "host": host, + "received": received, + "status": status, + } + + @staticmethod + def record2message(record): + """From specified db record creates a message with format + { + "host": record[0], + "received": record[1], + "status": record[2] + } + """ + return Manager.params2message( + record[0], + record[1], + record[2], + ) if record is not None else None + + def _load_config_value(self, connection, name, version="general"): + cur = connection.cursor() + cur.execute( + "SELECT option_value FROM vdc_options" + " WHERE option_name=%(name)s" + " AND version=%(version)s", + name, + version, + ) + records = cur.fetchmany() + return records[0] if records else None + + def init_config(self): + self._config = {} + + conn = self.open_connection() + self._config["NextKdumpTimeout"] = self._load_config_value( + conn, + "NextKdumpTimeout", + ) + self._config["KdumpFinishedTimeout"] = self._load_config_value( + conn, + "KdumpFinishedTimeout", + ) + self._config["FenceKdumpListenerHost"] = self._load_config_value( + conn, + "FenceKdumpListenerHost", + ) + self._config["FenceKdumpListenerPort"] = self._load_config_value( + conn, + "FenceKdumpListenerPort", + ) + + def get_config_value(self, name): + return self._config["name"] + + def open_connection(self): + return psycopg2.connect(self._db_dsn) + + def close_connection(self, connection): + """Tries to close specified db connection. If error appeared, + it's logged""" + try: + if connection is not None: + connection.close() + except (psycopg2.Warning, psycopg2.Error) as e: + self._logger.error( + "Error closing connection: %s", + e, + ) diff --git a/packaging/services/ovirt-fence-kdump-listener/flowterminator.py b/packaging/services/ovirt-fence-kdump-listener/flowterminator.py new file mode 100644 index 0000000..b0187a2 --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/flowterminator.py @@ -0,0 +1,84 @@ +import datetime +import dateutil.tz +import psycopg2 +import threading +import time + +import db +import logbase + + +@logbase.Logable() +class KdumpFlowTerminator(threading.Thread): + """Creates record with status "finished" for finished kdump flows""" + + def __init__(self, queue, db_manager, kdump_finished_timeout): + super(KdumpFlowTerminator, self).__init__() + self.daemon = True + self._queue = queue + self._db_manager = db_manager + self.kdump_finished_timeout = datetime.timedelta( + seconds=self._db_manager.get_config_value( + "KdumpFinishedTimeout" + ) + ) + + @staticmethod + def _get_unfinished_flows(connection): + """Returns list of db records as messages for hosts which + kdump flow hasn't been finished yet""" + cur = connection.cursor() + cur.execute( + "SELECT fkm1.host, fkm1.received, fkm1.status" + " FROM fence_kdump_messages fkm1" + " JOIN (" + " SELECT host, MAX(received) AS latest" + " FROM fence_kdump_messages" + " GROUP BY host" + " ) fkm2" + " ON fkm1.host = fkm2.host" + " AND fkm1.received = fkm2.latest" + " WHERE fkm1.status <> 'finished'" + " AND fkm1.host <> 'fence_kdump_listener'" + ) + + result = [] + for record in cur.fetchall(): + result.append(db.Manager.record2message(record)) + cur.close() + return result + + def run(self): + conn = None + while True: + try: + conn = self._db_manager.open_connection() + + finished = datetime.datetime.now(dateutil.tz.tzlocal()) + records = KdumpFlowTerminator._get_unfinished_flows(conn) + for record in records: + if ( + record["received"] + self.kdump_finished_timeout + < finished + ): + msg = db.Manager.params2message( + record["host"], + finished, + "finished", + ) + self._logger.debug( + "Message '%s' sent to queue.", + msg, + ) + self._queue.put(msg) + + except (psycopg2.Warning, psycopg2.Error) as e: + self._logger.error( + "Error on finishing flows: %s", + e, + ) + finally: + self._db_manager.close_connection(conn) + conn = None + + time.sleep(30) diff --git a/packaging/services/ovirt-fence-kdump-listener/heartbeat.py b/packaging/services/ovirt-fence-kdump-listener/heartbeat.py new file mode 100644 index 0000000..ea92510 --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/heartbeat.py @@ -0,0 +1,57 @@ +import datetime +import dateutil.tz +import psycopg2 +import threading +import time + +import db +import logbase + + +@logbase.Logable() +class Heartbeat(threading.Thread): + """Periodically updates received to current time for record + with host 'fence-kdump-listener'""" + + def __init__(self, db_manager): + super(Heartbeat, self).__init__() + self.daemon = True + self._db_manager = db_manager + + @staticmethod + def _update_heartbeat(connection, timestamp): + cur = connection.cursor() + cur.execute( + "UPDATE fence_kdump_messages" + " SET received = %s" + " WHERE host = 'fence_kdump_listener'", + timestamp, + ) + connection.commit() + cur.close() + + def run(self): + conn = None + while True: + try: + conn = self._db_manager.open_connection() + + Heartbeat._update_heartbeat( + conn, + datetime.datetime.now(dateutil.tz.tzlocal()), + ) + + except (psycopg2.Warning, psycopg2.Error) as e: + self._logger.error( + "Error updating heartbeat: %s", + e, + ) + finally: + self._db_manager.close_connection(conn) + conn = None + + time.sleep( + self._db_manager.get_config_value( + "FenceKdumpListenerAliveTimeout" + ) + ) diff --git a/packaging/services/ovirt-fence-kdump-listener/listener.py b/packaging/services/ovirt-fence-kdump-listener/listener.py new file mode 100644 index 0000000..0c47a1a --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/listener.py @@ -0,0 +1,66 @@ +import binascii +import datetime +import dateutil.tz +import SocketServer +import struct + +import db +import logbase + + +@logbase.Logable() +class FenceKdumpRequestHandler(SocketServer.DatagramRequestHandler): + """Magic number fence_kdump message for version 1""" + _FK_MSG_MAGIC_V1 = 456141376 + + def _parse_number(self, data, offset=0): + number = None + if data is not None: + number = struct.unpack_from("<I", data, offset)[0] + return number + + def handle(self): + data = self.request[0].strip() + received = datetime.datetime.now(dateutil.tz.tzlocal()) + self._logger.debug( + ( + "Received message '%s' from host='%s' at '%s'." + ), + binascii.hexlify(data), + self.client_address[0], + received, + ) + + magic = self._parse_number(data) + version = self._parse_number(data, 4) + + if magic == self._FK_MSG_MAGIC_V1 and version == 1: + msg = db.Manager.params2message( + self.client_address[0], + received, + None, + ) + self._logger.debug( + "Message '%s' sent to queue.", + msg, + ) + self.server.queue.put(msg) + + +@logbase.Logable() +class FenceKdumpListener(object): + def __init__( + self, + host, + port, + queue, + ): + self._queue = queue + self._server = SocketServer.ThreadingUDPServer( + (host, port), + FenceKdumpRequestHandler, + ) + self._server.queue = self._queue + + def start(self): + self._server.serve_forever() diff --git a/packaging/services/ovirt-fence-kdump-listener/logbase.py b/packaging/services/ovirt-fence-kdump-listener/logbase.py new file mode 100644 index 0000000..76ad459 --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/logbase.py @@ -0,0 +1,20 @@ +import logging + + +_LOG_NAME = 'ovirt-fence-kdump-listener' + + +class Logable(object): + def __call__(self, target): + _logger = logging.getLogger(".".join((_LOG_NAME, target.__name__))) + setattr(target, "_logger", _logger) + return target + + +def setup_logger(filename, verbose=False): + """Configures logging to specified filename""" + logger = logging.getLogger(_LOG_NAME) + logger.setLevel(logging.DEBUG if verbose else logging.INFO) + ch = logging.FileHandler(filename) + ch.setLevel(logging.DEBUG) + logger.addHandler(ch) diff --git a/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py b/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py new file mode 100755 index 0000000..e443443 --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py @@ -0,0 +1,147 @@ +#!/usr/bin/python + +# Copyright 2013 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gettext +import os +import Queue +import sys +_ = lambda m: gettext.dgettext(message=m, domain="ovirt-engine") + + +import config +import db +import flowterminator +import heartbeat +import listener +import logbase +import qconsumer + + +from ovirt_engine import configfile +from ovirt_engine import service + + +class Daemon(service.Daemon): + + def __init__(self): + super(Daemon, self).__init__() + self._defaults = os.path.abspath( + os.path.join( + os.path.dirname(sys.argv[0]), + "ovirt-fence-kdump-listener.conf", + ) + ) + + def _checkInstallation( + self, + pidfile, + ): + # Check the required engine directories and files: + self.check( + os.path.join( + self._config.get("ENGINE_USR"), + "services", + ), + directory=True, + ) + + if pidfile is not None: + self.check( + name=pidfile, + writable=True, + mustExist=False, + ) + + def daemonSetup(self): + + if not os.path.exists(self._defaults): + raise RuntimeError( + _( + "The configuration defaults file "{file}" " + "required but missing" + ).format( + file=self._defaults, + ) + ) + + self._config = configfile.ConfigFile( + ( + self._defaults, + config.ENGINE_VARS, + ), + ) + + self._checkInstallation( + pidfile=self.pidfile, + ) + + def daemonStdHandles(self): + consoleLog = open(os.devnull, "w+") + return (consoleLog, consoleLog) + + def daemonContext(self): + logbase.setup_logger( + os.path.join( + self._config.get("ENGINE_LOG"), + "ovirt-fence-kdump-lsnr.log", + ), + os.environ.get('OVIRT_SERVICE_DEBUG', '0') == '0', + ) + + queue = Queue.Queue() + + db_manager = db.Manager( + host=self._config.get("ENGINE_DB_HOST"), + port=self._config.get("ENGINE_DB_PORT"), + database=self._config.get("ENGINE_DB_DATABASE"), + username=self._config.get("ENGINE_DB_USER"), + password=self._config.get("ENGINE_DB_PASSWORD"), + secured=self._config.get("ENGINE_DB_SECURED"), + secure_validation=self._config.get("ENGINE_DB_SECURED_VALIDATION"), + ) + db_manager.init_config() + + cnsmr = qconsumer.QueueConsumer( + queue=queue, + db_manager=db_manager, + ) + cnsmr.start() + + ft = flowterminator.KdumpFlowTerminator( + queue=queue, + db_manager=db_manager, + ) + ft.start() + + hb = heartbeat.Heartbeat( + db_manager=db_manager + ) + hb.start() + + lsnr = listener.FenceKdumpListener( + db_manager.get_config_value("FenceKdumpListenerHost"), + db_manager.get_config_value("FenceKdumpListenerPort"), + queue=queue + ) + lsnr.start() + +if __name__ == "__main__": + service.setupLogger() + d = Daemon() + d.run() + + +# vim: expandtab tabstop=4 shiftwidth=4 diff --git a/packaging/services/ovirt-fence-kdump-listener/qconsumer.py b/packaging/services/ovirt-fence-kdump-listener/qconsumer.py new file mode 100644 index 0000000..389a249 --- /dev/null +++ b/packaging/services/ovirt-fence-kdump-listener/qconsumer.py @@ -0,0 +1,94 @@ +import datetime +import psycopg2 +import threading +import time + +import db +import logbase + + +@logbase.Logable() +class QueueConsumer(threading.Thread): + """Receives messages from queue and stores them into database""" + + def __init__(self, queue, db_manager): + super(QueueConsumer, self).__init__() + self.daemon = True + self._queue = queue + self._db_manager = db_manager + self._next_kdump_timeout = datetime.timedelta( + seconds=self._db_manager.get_config_value( + "NextKdumpTimeout" + ) + ) + self._db_conn = None + + def _create_message(self, conn, message): + cur = conn.cursor() + cur.execute( + "INSERT INTO fence_kdump_messages (host, received, status)" + " VALUES (%(host)s, %(received)s, %(status)s)", + message, + ) + conn.commit() + cur.close() + self._logger.debug("Message '%s' wrote to database.", message) + + def _get_latest(self, conn, host): + cur = conn.cursor() + cur.execute( + "SELECT host, received, status FROM fence_kdump_messages" + " WHERE host = %s" + " ORDER BY received DESC" + " LIMIT 1", + host, + ) + records = cur.fetchmany() + result = db.Manager.record2message(records[0]) if records else None + cur.close() + self._logger.debug( + "Most recent record for host '%s' is '%s'", + host, + result, + ) + return result + + def run(self): + while True: + if self._queue.empty(): + time.sleep(1) + continue + + try: + conn = self._db_manager.open_connection() + message = self._queue.get() + if message["status"] is None: + latest = self._get_latest(conn, message["host"]) + if ( + latest is None + or ( + latest["received"] + self._next_kdump_timeout + < message["received"] + ) + ): + message["status"] = "started" + else: + message["status"] = "dumping" + self._create_message(conn, message) + + if message["status"] != "dumping": + self._logger.info( + "Host '%s' %s kdump flow.", + message["host"], + message["status"], + ) + + except (psycopg2.Warning, psycopg2.Error) as e: + self._logger.error( + "Error writing message '%s' to database: %s", + message, + e, + ) + finally: + if self._queue.empty: + self._db_manager.close_connection(conn) -- To view, visit http://gerrit.ovirt.org/27201 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ieec3bad47bbba860a52a9ff4e2eb7f61277f4e36 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Martin Peřina <mper...@redhat.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches