Package: release.debian.org Severity: normal Tags: buster User: release.debian....@packages.debian.org Usertags: pu
Dear Stable Release team, I'd like to upgrade oslo.messaging to version 8.1.4-1+deb10u1. Indeed, in versin 8.1.3, when a Rabbitmq server configured through transport_url dies (or is turned off because of maintenance), the OpenStack clients, which means, all services running on compute hosts, would attempt to reconnect to the same RabbitMQ host. As a consequence, upgrading would be *very* problematic, as the node wouldn't reconnect to another node when we're upgrading one rabbit node (and as a consequence, turning off the service there). Attached is the debdiff for this change, Please allow me to upgrade oslo.messaging in Buster, Cheers, Thomas Goirand (zigo)
diff -Nru python-oslo.messaging-8.1.3/debian/changelog python-oslo.messaging-8.1.4/debian/changelog --- python-oslo.messaging-8.1.3/debian/changelog 2019-05-17 14:33:29.000000000 +0200 +++ python-oslo.messaging-8.1.4/debian/changelog 2019-10-27 18:01:18.000000000 +0100 @@ -1,3 +1,10 @@ +python-oslo.messaging (8.1.4-1+deb10u1) buster; urgency=medium + + * New upstream point release, with an important fix: + - Fix switch connection destination when a rabbitmq cluster node disappear. + + -- Thomas Goirand <z...@debian.org> Sun, 27 Oct 2019 18:01:18 +0100 + python-oslo.messaging (8.1.3-1) unstable; urgency=medium * New upstream point release, which includes this fix: diff -Nru python-oslo.messaging-8.1.3/doc/requirements.txt python-oslo.messaging-8.1.4/doc/requirements.txt --- python-oslo.messaging-8.1.3/doc/requirements.txt 2019-04-23 09:50:41.000000000 +0200 +++ python-oslo.messaging-8.1.4/doc/requirements.txt 2019-08-09 21:54:57.000000000 +0200 @@ -3,7 +3,8 @@ # process, which may cause wedges in the gate later. openstackdocstheme>=1.18.1 # Apache-2.0 -sphinx!=1.6.6,!=1.6.7,>=1.6.2 # BSD +sphinx!=1.6.6,!=1.6.7,>=1.6.2,<2.0.0;python_version=='2.7' # BSD +sphinx!=1.6.6,!=1.6.7,>=1.6.2;python_version>='3.4' # BSD reno>=2.5.0 # Apache-2.0 # imported when the source code is parsed for generating documentation: diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py --- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py 2019-04-23 09:50:41.000000000 +0200 +++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py 2019-08-09 21:54:57.000000000 +0200 @@ -167,8 +167,34 @@ 'duration': duration}) return + def heartbeat(self): + with self.listener.driver._get_connection( + rpc_common.PURPOSE_SEND) as conn: + self._send_reply(conn, None, None, ending=False) + + # NOTE(sileht): Those have already be ack in RpcListener IO thread + # We keep them as noop until all drivers do the same def acknowledge(self): - self._message_operations_handler.do(self.message.acknowledge) + pass + + def requeue(self): + pass + + +class NotificationAMQPIncomingMessage(AMQPIncomingMessage): + def acknowledge(self): + def _do_ack(): + try: + self.message.acknowledge() + except Exception as exc: + # NOTE(kgiusti): this failure is likely due to a loss of the + # connection to the broker. Not much we can do in this case, + # especially considering the Notification has already been + # dispatched. This *could* result in message duplication + # (unacked msg is returned to the queue by the broker), but the + # driver tries to catch that using the msg_id_cache. + LOG.warning("Failed to acknowledge received message: %s", exc) + self._message_operations_handler.do(_do_ack) self.listener.msg_id_cache.add(self.unique_id) def requeue(self): @@ -178,12 +204,12 @@ # msg_id_cache, the message will be reconsumed, the only difference is # the message stay at the beginning of the queue instead of moving to # the end. - self._message_operations_handler.do(self.message.requeue) - - def heartbeat(self): - with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: - self._send_reply(conn, None, None, ending=False) + def _do_requeue(): + try: + self.message.requeue() + except Exception as exc: + LOG.warning("Failed to requeue received message: %s", exc) + self._message_operations_handler.do(_do_requeue) class ObsoleteReplyQueuesCache(object): @@ -256,7 +282,7 @@ else: LOG.debug("received message with unique_id: %s", unique_id) - self.incoming.append(AMQPIncomingMessage( + self.incoming.append(self.message_cls( self, ctxt.to_dict(), message, @@ -319,6 +345,41 @@ self.conn.close() +class RpcAMQPListener(AMQPListener): + message_cls = AMQPIncomingMessage + + def __call__(self, message): + # NOTE(kgiusti): In the original RPC implementation the RPC server + # would acknowledge the request THEN process it. The goal of this was + # to prevent duplication if the ack failed. Should the ack fail the + # request would be discarded since the broker would not remove the + # request from the queue since no ack was received. That would lead to + # the request being redelivered at some point. However this approach + # meant that the ack was issued from the dispatch thread, not the + # consumer thread, which is bad since kombu is not thread safe. So a + # change was made to schedule the ack to be sent on the consumer thread + # - breaking the ability to catch ack errors before dispatching the + # request. To fix this we do the actual ack here in the consumer + # callback and avoid the upcall if the ack fails. See + # https://bugs.launchpad.net/oslo.messaging/+bug/1695746 + # for all the gory details... + try: + message.acknowledge() + except Exception as exc: + LOG.warning("Discarding RPC request due to failed acknowlege: %s", + exc) + else: + # NOTE(kgiusti): be aware that even if the acknowledge call + # succeeds there is no guarantee the broker actually gets the ACK + # since acknowledge() simply writes the ACK to the socket (there is + # no ACK confirmation coming back from the broker) + super(RpcAMQPListener, self).__call__(message) + + +class NotificationAMQPListener(AMQPListener): + message_cls = NotificationAMQPIncomingMessage + + class ReplyWaiters(object): WAKE_UP = object() @@ -590,7 +651,7 @@ def listen(self, target, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) - listener = AMQPListener(self, conn) + listener = RpcAMQPListener(self, conn) conn.declare_topic_consumer(exchange_name=self._get_exchange(target), topic=target.topic, @@ -608,7 +669,7 @@ batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) - listener = AMQPListener(self, conn) + listener = NotificationAMQPListener(self, conn) for target, priority in targets_and_priorities: conn.declare_topic_consumer( exchange_name=self._get_exchange(target), diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py --- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py 2019-04-23 09:50:41.000000000 +0200 +++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py 2019-08-09 21:54:57.000000000 +0200 @@ -373,7 +373,7 @@ # greenthread. # So, a connection cannot be shared between thread/greenthread and # this two variables permit to define the purpose of the connection -# to allow drivers to add special handling if needed (like heatbeat). +# to allow drivers to add special handling if needed (like heartbeat). # amqp drivers create 3 kind of connections: # * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection # * driver.send*(): a pool of 'PURPOSE_SEND' connections is used diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py --- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py 2019-04-23 09:50:41.000000000 +0200 +++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py 2019-08-09 21:54:57.000000000 +0200 @@ -610,7 +610,7 @@ # expected waiting the events drain, we start heartbeat_check and # retrieve the server heartbeat packet only two times more than # the minimum required for the heartbeat works - # (heatbeat_timeout/heartbeat_rate/2.0, default kombu + # (heartbeat_timeout/heartbeat_rate/2.0, default kombu # heartbeat_rate is 2) self._heartbeat_wait_timeout = ( float(self.heartbeat_timeout_threshold) / @@ -635,7 +635,7 @@ # NOTE(sileht): value chosen according the best practice from kombu # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop - # For heatbeat, we can set a bigger timeout, and check we receive the + # For heartbeat, we can set a bigger timeout, and check we receive the # heartbeat packets regulary if self._heartbeat_supported_and_enabled(): self._poll_timeout = self._heartbeat_wait_timeout @@ -974,6 +974,14 @@ def _heartbeat_thread_job(self): """Thread that maintains inactive connections """ + # NOTE(hberaud): Python2 doesn't have ConnectionRefusedError + # defined so to switch connections destination on failure + # with python2 and python3 we need to wrapp adapt connection refused + try: + ConnectRefuseError = ConnectionRefusedError + except NameError: + ConnectRefuseError = socket.error + while not self._heartbeat_exit_event.is_set(): with self._connection_lock.for_heartbeat(): @@ -990,14 +998,24 @@ self.connection.drain_events(timeout=0.001) except socket.timeout: pass + # NOTE(hberaud): In a clustered rabbitmq when + # a node disappears, we get a ConnectionRefusedError + # because the socket get disconnected. + # The socket access yields a OSError because the heartbeat + # tries to reach an unreachable host (No route to host). + # Catch these exceptions to ensure that we call + # ensure_connection for switching the + # connection destination. except (socket.timeout, + ConnectRefuseError, + OSError, kombu.exceptions.OperationalError) as exc: LOG.info(_LI("A recoverable connection/channel error " "occurred, trying to reconnect: %s"), exc) self.ensure_connection() except Exception: - LOG.warning(_LW("Unexpected error during heartbeart " - "thread processing, retrying...")) + LOG.warning(_LW("Unexpected error during heartbeat " + "thread processing, retrying...")) LOG.debug('Exception', exc_info=True) self._heartbeat_exit_event.wait( diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py --- python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py 2019-04-23 09:50:41.000000000 +0200 +++ python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py 2019-08-09 21:54:57.000000000 +0200 @@ -152,6 +152,9 @@ def _process_incoming(self, incoming): message = incoming[0] + + # TODO(sileht): We should remove that at some point and do + # this directly in the driver try: message.acknowledge() except Exception: diff -Nru python-oslo.messaging-8.1.3/test-requirements.txt python-oslo.messaging-8.1.4/test-requirements.txt --- python-oslo.messaging-8.1.3/test-requirements.txt 2019-04-23 09:50:41.000000000 +0200 +++ python-oslo.messaging-8.1.4/test-requirements.txt 2019-08-09 21:54:57.000000000 +0200 @@ -31,7 +31,7 @@ pyngus>=2.2.0 # Apache-2.0 # Bandit security code scanner -bandit>=1.1.0 # Apache-2.0 +bandit>=1.1.0,<1.6.0 # Apache-2.0 eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT greenlet>=0.4.10 # MIT