details:   https://code.tryton.org/tryton/commit/bb52a833e823
branch:    default
user:      Nicolas Évrard <[email protected]>
date:      Tue Nov 25 13:27:24 2025 +0100
description:
        Delegate the notifications handling to the backend
diffstat:

 trytond/trytond/backend/database.py            |   6 ++++++
 trytond/trytond/backend/postgresql/database.py |   8 ++++++++
 trytond/trytond/bus.py                         |  10 +++++-----
 trytond/trytond/cache.py                       |  11 +++++------
 trytond/trytond/ir/queue_.py                   |   3 +--
 5 files changed, 25 insertions(+), 13 deletions(-)

diffs (114 lines):

diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/backend/database.py
--- a/trytond/trytond/backend/database.py       Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/backend/database.py       Tue Nov 25 13:27:24 2025 +0100
@@ -73,6 +73,12 @@
     def estimated_count(self, connection, table):
         raise NotImplementedError
 
+    def notify(self, connection, channel, payload):
+        raise NotImplementedError
+
+    def get_notifications(self, connection):
+        raise NotImplementedError
+
     @classmethod
     def lock(cls, connection, table):
         raise NotImplementedError
diff -r f5cd50fd92a5 -r bb52a833e823 
trytond/trytond/backend/postgresql/database.py
--- a/trytond/trytond/backend/postgresql/database.py    Sat Mar 21 12:07:21 
2026 +0100
+++ b/trytond/trytond/backend/postgresql/database.py    Tue Nov 25 13:27:24 
2025 +0100
@@ -492,6 +492,14 @@
             cursor.execute(*from_item.select(Count(Literal('*'))))
         return cursor.fetchone()[0]
 
+    def notify(self, connection, channel, payload):
+        cursor = connection.cursor()
+        cursor.execute('NOTIFY "%s", %%s' % channel, (payload,))
+
+    def get_notifications(self, connection):
+        connection.poll()
+        return connection.notifies
+
     @classmethod
     def lock(cls, connection, table):
         cursor = connection.cursor()
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/bus.py
--- a/trytond/trytond/bus.py    Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/bus.py    Tue Nov 25 13:27:24 2025 +0100
@@ -154,9 +154,9 @@
             selector.register(conn, selectors.EVENT_READ)
             while cls._queues[pid, database]['timeout'] > now:
                 selector.select(timeout=config.getint('bus', 'select_timeout'))
-                conn.poll()
-                while conn.notifies:
-                    notification = conn.notifies.pop()
+                notifications = db.get_notifications(conn)
+                while notifications:
+                    notification = notifications.pop()
                     payload = json.loads(
                         notification.payload,
                         object_hook=JSONDecoder())
@@ -200,13 +200,13 @@
             logger.info("database backend does not support channels")
             return
 
-        cursor = transaction.connection.cursor()
         message['message_id'] = str(uuid.uuid4())
         payload = json.dumps({
                 'channel': channel,
                 'message': message,
                 }, cls=JSONEncoder, separators=(',', ':'))
-        cursor.execute('NOTIFY "%s", %%s' % cls._channel, (payload,))
+        transaction.database.notify(
+            transaction.connection, cls._channel, payload)
         logger.debug(
             "publish %r to '%s' on '%s'",
             message, channel, transaction.database.name)
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/cache.py
--- a/trytond/trytond/cache.py  Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/cache.py  Tue Nov 25 13:27:24 2025 +0100
@@ -308,9 +308,8 @@
                 # The count computed as
                 # 8000 (max notify size) / 64 (max name data len)
                 for sub_reset in grouped_slice(reset, 125):
-                    cursor.execute(
-                        'NOTIFY "%s", %%s' % cls._channel,
-                        (json.dumps(list(sub_reset), separators=(',', ':')),))
+                    database.notify(transaction.connection, cls._channel,
+                        json.dumps(list(sub_reset), separators=(',', ':')))
         else:
             connection = database.get_connection(
                 readonly=False, autocommit=True)
@@ -429,9 +428,9 @@
             while cls._local.listeners.get(dbname) == current_thread:
                 selector.select(
                     timeout=config.getint('cache', 'select_timeout'))
-                conn.poll()
-                while conn.notifies:
-                    notification = conn.notifies.pop()
+                notifications = database.get_notifications(conn)
+                while notifications:
+                    notification = notifications.pop()
                     payload = notification.payload
                     if payload and payload.startswith(REFRESH_POOL_MSG):
                         remote_id = payload[len(REFRESH_POOL_MSG) + 1:]
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/ir/queue_.py
--- a/trytond/trytond/ir/queue_.py      Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/ir/queue_.py      Tue Nov 25 13:27:24 2025 +0100
@@ -61,7 +61,6 @@
     def push(cls, name, data, scheduled_at=None, expected_at=None):
         transaction = Transaction()
         database = transaction.database
-        cursor = transaction.connection.cursor()
         with without_check_access():
             record, = cls.create([{
                         'name': name,
@@ -70,7 +69,7 @@
                         'expected_at': expected_at,
                         }])
         if database.has_channel():
-            cursor.execute('NOTIFY "%s"', (cls.__name__,))
+            database.notify(transaction.connection, cls.__name__, '')
         if not config.getboolean('queue', 'worker', default=False):
             transaction.tasks.append(record.id)
         return record.id

Reply via email to