Alon Bar-Lev has posted comments on this change. Change subject: fklsnr: Introduce standalone fence_kdump listener ......................................................................
Patch Set 11: (28 comments) http://gerrit.ovirt.org/#/c/27201/11/packaging/services/ovirt-fence-kdump-listener/db.py File packaging/services/ovirt-fence-kdump-listener/db.py: Line 180: try: Line 181: with contextlib.closing( Line 182: self._connection.cursor() Line 183: ) as cursor: Line 184: cursor.callproc( please make sure that at rhel's old driver this is supported. Line 185: name, Line 186: args, Line 187: ) Line 188: Line 191: while True: Line 192: entry = cursor.fetchone() Line 193: if entry is None: Line 194: break Line 195: ret.append(dict(zip(cols, entry))) move the above to function as you reuse it now. Line 196: except (psycopg2.Error, psycopg2.Warning) as e: Line 197: raise DbException( Line 198: message=( Line 199: "Error calling procedure '%s'" % name Line 216: ): Line 217: super(EngineDao, self).__init__() Line 218: self._db_mgr = db_manager Line 219: Line 220: def save_config_value(self, name, value, version='general'): I do not think we need this per what I wrote at service. Line 221: self._db_mgr.execute_query( Line 222: statement=""" Line 223: UPDATE vdc_options Line 224: SET option_value=%(value)s Line 236: def update_vds_kdump_status(self, vds_id, status, address): Line 237: return self._db_mgr.call_procedure( Line 238: name='UpsertKdumpStatus', Line 239: args=( Line 240: vds_id, # v_vds_id the stored procedure can find the id its-self, you should provide the address and port and let it do its magic. Line 241: status, # v_status Line 242: json.dumps(address), # v_address Line 243: ), Line 244: ) Line 262: # address needs to be converted from string to tuple Line 263: record['address'] = tuple(json.loads(record['address'])) Line 264: return result Line 265: Line 266: def get_vds_id(self, address): no need Line 267: res = self._db_mgr.call_procedure( Line 268: name='GetVdsIdByAddress', Line 269: args=( Line 270: address, # v_address http://gerrit.ovirt.org/#/c/27201/11/packaging/services/ovirt-fence-kdump-listener/listener.py File packaging/services/ovirt-fence-kdump-listener/listener.py: Line 28: class FenceKdumpListener(base.Base): Line 29: # Host kdump flow states Line 30: SESSION_STATE_INITIAL = 'started' Line 31: SESSION_STATE_DUMPING = 'dumping' Line 32: SESSION_STATE_CLOSED = 'finished' please be consistent ... CLOSED->FINISHED or finished->closed from now I assume: SESSION_STATE_FINISHED and we need to add: SESSION_STATE_CLOSED the closed state is entered if: 1. database persisted the FINISHED 2. session is invalid as message format is incorrect 3. database update fails as address is unknown. Line 33: Line 34: # buffer size to receive message Line 35: _BUF_SIZE = 0x20 Line 36: Line 126: _( Line 127: "Discarding invalid message '{msg}' from address " Line 128: "'{address}'." Line 129: ).format( Line 130: msg=binascii.hexlify(message), message.encode("hex") ? Line 131: address=entry['address'][0], Line 132: ) Line 133: ) Line 134: Line 129: ).format( Line 130: msg=binascii.hexlify(message), Line 131: address=entry['address'][0], Line 132: ) Line 133: ) you should have caught the exception in this function... not at the run() and mark the entry as 'CLOSED' (post FINISHED state), so that session house keeping can delete it. Line 134: Line 135: # message is valid, update timestamp Line 136: entry['updated'] = datetime.datetime.utcnow() Line 137: Line 136: entry['updated'] = datetime.datetime.utcnow() Line 137: Line 138: # if it's initial message, store it to sessions Line 139: if entry['status'] == self.SESSION_STATE_INITIAL: Line 140: self.logger.info( not sure we need this info not to flood log. Line 141: _( Line 142: "Host '{address}' started kdump flow." Line 143: ).format( Line 144: address=entry['address'][0], Line 167: "Host '{address}' finished kdump flow." Line 168: ).format( Line 169: address=session['address'][0] Line 170: ) Line 171: ) here you should also delete sessions that are marked as non dirty and are at CLOSED state we should also have some protection against memory overflow... limiting the # of sessions... but let's finish all first. Line 172: Line 173: def _heartbeat(self): Line 174: if self._interval_finished( Line 175: interval=self._heartbeatInterval, Line 178: self._lastHeartbeat = datetime.datetime.utcnow() Line 179: Line 180: def _save_heartbeat(self): Line 181: self._dao.update_heartbeat() Line 182: self._lastHeartbeat = datetime.datetime.utcnow() why do we need to keep _lastHeartBeat? oh... in the _heartbeet function you set it... but where is it actually used? Line 183: Line 184: def _save_sessions(self): Line 185: # update db state for all updated sessions Line 186: for session in self._sessions.values(): Line 183: Line 184: def _save_sessions(self): Line 185: # update db state for all updated sessions Line 186: for session in self._sessions.values(): Line 187: if session['updated'] > self._lastDbSync: please add 'dirty' flag to session. it will make it easier as: 1. updated is only when a new data arrives, not when changing states. 2. you can set it and clear it for any other reason that requires persistence. every update to session object derives dirty=true, this function should check for dirty and persist, clearing dirty. Line 188: vds_id = self._dao.get_vds_id( Line 189: address=session['address'][0], Line 190: ) Line 191: Line 186: for session in self._sessions.values(): Line 187: if session['updated'] > self._lastDbSync: Line 188: vds_id = self._dao.get_vds_id( Line 189: address=session['address'][0], Line 190: ) we should not get the vds_id, but let the stored procedure do this. the parameter to stored procedure is the address and status. the output of the stored procedure is # of rows modified, so if 0 you have to drop the session. Line 191: Line 192: if vds_id is None: Line 193: self.logger.error( Line 194: _( Line 208: ) Line 209: Line 210: # remove finished sessions (engine will remove them from db) Line 211: if session['status'] == 'finished': Line 212: del self._sessions[session['address']] this should be at house keeping. here you should just move the state to CLOSED Line 213: Line 214: def _load_sessions(self): Line 215: self._lastDbSync = datetime.datetime.utcnow() Line 216: for record in self._dao.get_unfinished_sessions(): Line 211: if session['status'] == 'finished': Line 212: del self._sessions[session['address']] Line 213: Line 214: def _load_sessions(self): Line 215: self._lastDbSync = datetime.datetime.utcnow() this should be done part of house keeping at successful FIRST database connection establish. not sure why we assume _lastDbSync as a success at this point... it should be None until actually synchronized. Line 216: for record in self._dao.get_unfinished_sessions(): Line 217: session = { Line 218: 'status': record['status'], Line 219: 'address': record['address'], Line 220: 'updated': self._lastDbSync, Line 221: } Line 222: self._sessions[session['address']] = session Line 223: Line 224: def _db_sync(self): this is part of house keeping Line 225: if self._db_manager.validate_connection(): Line 226: try: Line 227: self._save_heartbeat() Line 228: self._save_sessions() Line 223: Line 224: def _db_sync(self): Line 225: if self._db_manager.validate_connection(): Line 226: try: Line 227: self._save_heartbeat() I am not sure that the interval of heartbeat is the same as the interval of sessions, but let's keep this for now. Line 228: self._save_sessions() Line 229: self._lastDbSync = datetime.datetime.utcnow() Line 230: except db.DbException as e: Line 231: self.logger.error( Line 242: _( Line 243: "Database connection is not available, synchronization " Line 244: "will be postponed." Line 245: ) Line 246: ) this message should appear once you lost connection and an info message should appear once you re-established connection. please do not flood log. Line 247: Line 248: def run(self): Line 249: # load session from db Line 250: self._load_sessions() Line 246: ) Line 247: Line 248: def run(self): Line 249: # load session from db Line 250: self._load_sessions() this should be done during house keeping when first connection is established. no need to do this here and there. just defer for house keeping to engage, in the mean time handle sessions. Line 251: Line 252: while True: Line 253: (data, address) = self._recvfrom() Line 254: if address is None: Line 268: entry=entry, Line 269: message=data, Line 270: ) Line 271: except InvalidMessage as e: Line 272: self.logger.error(e) this will create overflow within log if someone just sends invalid messages. it should be displayed only in debug, or as statistics every X seconds. Line 273: Line 274: # try to sync with db Line 275: self._db_sync() Line 276: Line 271: except InvalidMessage as e: Line 272: self.logger.error(e) Line 273: Line 274: # try to sync with db Line 275: self._db_sync() this is part of house keeping. Line 276: Line 277: Line 278: class InvalidMessage(Exception): Line 279: pass http://gerrit.ovirt.org/#/c/27201/11/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.conf.in File packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.conf.in: Line 1: # Line 2: # This is a default configuration file for oVirt/RHEV-M fence_kdump listener Line 3: # Line 4: TRACE_ENABLE=False Line 5: TRACE_FILE= do you use these? Line 6: ENGINE_USR="@ENGINE_USR@" Line 7: Line 8: # Line 9: # WARNING: Line 2: # This is a default configuration file for oVirt/RHEV-M fence_kdump listener Line 3: # Line 4: TRACE_ENABLE=False Line 5: TRACE_FILE= Line 6: ENGINE_USR="@ENGINE_USR@" you do not use ENGINE_USR Line 7: Line 8: # Line 9: # WARNING: Line 10: # If some of the following options are changed, engine needs to be Line 20: # Defines the IP address(es) to send fence_kdump messages to from hosts Line 21: DESTINATION_ADDRESS= Line 22: Line 23: # Defines interval in seconds between messages sent by fence_kdump Line 24: MESSAGE_INTERVAL=5 I do not understand this one Line 25: Line 26: # Defines the interval in seconds of listener's heartbeat updates Line 27: HEARTBEAT_INTERVAL=5 Line 28: Line 26: # Defines the interval in seconds of listener's heartbeat updates Line 27: HEARTBEAT_INTERVAL=5 Line 28: Line 29: # Defines the max timeout for engine to detect if listener is alive Line 30: LISTENER_ALIVE_TIMEOUT=10 this belongs to engine Line 31: Line 32: # Defines maximum timeout in seconds to wait until 1st message from kdumping Line 33: # host is received (to detect that host kdump flow started). Line 34: KDUMP_STARTED_TIMEOUT=30 Line 30: LISTENER_ALIVE_TIMEOUT=10 Line 31: Line 32: # Defines maximum timeout in seconds to wait until 1st message from kdumping Line 33: # host is received (to detect that host kdump flow started). Line 34: KDUMP_STARTED_TIMEOUT=30 this belongs to engine Line 35: Line 36: # Defines maximum timeout in seconds after last received message from kdumping Line 37: # hosts after which the host kdump flow is marked as FINISHED http://gerrit.ovirt.org/#/c/27201/11/packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py File packaging/services/ovirt-fence-kdump-listener/ovirt-fence-kdump-listener.py: Line 56: self._config.get("ENGINE_USR"), Line 57: "services", Line 58: ), Line 59: directory=True, Line 60: ) so you can drop ENGINE_USR from configuration and drop this one... as you do not really need it. Line 61: Line 62: if pidfile is not None: Line 63: self.check( Line 64: name=pidfile, Line 126: }: Line 127: dao.save_config_value( Line 128: option[1], Line 129: self._config.get(option[0]) Line 130: ) this should be done (if any) when first connection is established. and I do think that it is a mistake to do so. please move all the engine variables into engine side, do not modify options, there is no technical need to do so. this includes the destination address and destination port. let's makes lives easier for us. if you call the variable the same, you can even have same file symlinked at /etc/ovirt-engine/engine.conf.d and /etc/ovirt-engine/fence....conf.d/ to provide the same options. So you can even consider putting it at engine local config. But please avoid complexity result from database updates, and move the variables to the component that actually use them. Line 131: Line 132: with listener.FenceKdumpListener( Line 133: bind=( Line 134: self._config.get('LISTENER_ADDRESS'), -- To view, visit http://gerrit.ovirt.org/27201 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ieec3bad47bbba860a52a9ff4e2eb7f61277f4e36 Gerrit-PatchSet: 11 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Martin Peřina <mper...@redhat.com> Gerrit-Reviewer: Alon Bar-Lev <alo...@redhat.com> Gerrit-Reviewer: Barak Azulay <bazu...@redhat.com> Gerrit-Reviewer: Eli Mesika <emes...@redhat.com> Gerrit-Reviewer: Martin Peřina <mper...@redhat.com> Gerrit-Reviewer: Oved Ourfali <oourf...@redhat.com> Gerrit-Reviewer: Saggi Mizrahi <smizr...@redhat.com> Gerrit-Reviewer: automat...@ovirt.org Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches