details: https://code.tryton.org/tryton/commit/bb52a833e823
branch: default
user: Nicolas Évrard <[email protected]>
date: Tue Nov 25 13:27:24 2025 +0100
description:
Delegate the notifications handling to the backend
diffstat:
trytond/trytond/backend/database.py | 6 ++++++
trytond/trytond/backend/postgresql/database.py | 8 ++++++++
trytond/trytond/bus.py | 10 +++++-----
trytond/trytond/cache.py | 11 +++++------
trytond/trytond/ir/queue_.py | 3 +--
5 files changed, 25 insertions(+), 13 deletions(-)
diffs (114 lines):
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/backend/database.py
--- a/trytond/trytond/backend/database.py Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/backend/database.py Tue Nov 25 13:27:24 2025 +0100
@@ -73,6 +73,12 @@
def estimated_count(self, connection, table):
raise NotImplementedError
+ def notify(self, connection, channel, payload):
+ raise NotImplementedError
+
+ def get_notifications(self, connection):
+ raise NotImplementedError
+
@classmethod
def lock(cls, connection, table):
raise NotImplementedError
diff -r f5cd50fd92a5 -r bb52a833e823
trytond/trytond/backend/postgresql/database.py
--- a/trytond/trytond/backend/postgresql/database.py Sat Mar 21 12:07:21
2026 +0100
+++ b/trytond/trytond/backend/postgresql/database.py Tue Nov 25 13:27:24
2025 +0100
@@ -492,6 +492,14 @@
cursor.execute(*from_item.select(Count(Literal('*'))))
return cursor.fetchone()[0]
+ def notify(self, connection, channel, payload):
+ cursor = connection.cursor()
+ cursor.execute('NOTIFY "%s", %%s' % channel, (payload,))
+
+ def get_notifications(self, connection):
+ connection.poll()
+ return connection.notifies
+
@classmethod
def lock(cls, connection, table):
cursor = connection.cursor()
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/bus.py
--- a/trytond/trytond/bus.py Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/bus.py Tue Nov 25 13:27:24 2025 +0100
@@ -154,9 +154,9 @@
selector.register(conn, selectors.EVENT_READ)
while cls._queues[pid, database]['timeout'] > now:
selector.select(timeout=config.getint('bus', 'select_timeout'))
- conn.poll()
- while conn.notifies:
- notification = conn.notifies.pop()
+ notifications = db.get_notifications(conn)
+ while notifications:
+ notification = notifications.pop()
payload = json.loads(
notification.payload,
object_hook=JSONDecoder())
@@ -200,13 +200,13 @@
logger.info("database backend does not support channels")
return
- cursor = transaction.connection.cursor()
message['message_id'] = str(uuid.uuid4())
payload = json.dumps({
'channel': channel,
'message': message,
}, cls=JSONEncoder, separators=(',', ':'))
- cursor.execute('NOTIFY "%s", %%s' % cls._channel, (payload,))
+ transaction.database.notify(
+ transaction.connection, cls._channel, payload)
logger.debug(
"publish %r to '%s' on '%s'",
message, channel, transaction.database.name)
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/cache.py
--- a/trytond/trytond/cache.py Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/cache.py Tue Nov 25 13:27:24 2025 +0100
@@ -308,9 +308,8 @@
# The count computed as
# 8000 (max notify size) / 64 (max name data len)
for sub_reset in grouped_slice(reset, 125):
- cursor.execute(
- 'NOTIFY "%s", %%s' % cls._channel,
- (json.dumps(list(sub_reset), separators=(',', ':')),))
+ database.notify(transaction.connection, cls._channel,
+ json.dumps(list(sub_reset), separators=(',', ':')))
else:
connection = database.get_connection(
readonly=False, autocommit=True)
@@ -429,9 +428,9 @@
while cls._local.listeners.get(dbname) == current_thread:
selector.select(
timeout=config.getint('cache', 'select_timeout'))
- conn.poll()
- while conn.notifies:
- notification = conn.notifies.pop()
+ notifications = database.get_notifications(conn)
+ while notifications:
+ notification = notifications.pop()
payload = notification.payload
if payload and payload.startswith(REFRESH_POOL_MSG):
remote_id = payload[len(REFRESH_POOL_MSG) + 1:]
diff -r f5cd50fd92a5 -r bb52a833e823 trytond/trytond/ir/queue_.py
--- a/trytond/trytond/ir/queue_.py Sat Mar 21 12:07:21 2026 +0100
+++ b/trytond/trytond/ir/queue_.py Tue Nov 25 13:27:24 2025 +0100
@@ -61,7 +61,6 @@
def push(cls, name, data, scheduled_at=None, expected_at=None):
transaction = Transaction()
database = transaction.database
- cursor = transaction.connection.cursor()
with without_check_access():
record, = cls.create([{
'name': name,
@@ -70,7 +69,7 @@
'expected_at': expected_at,
}])
if database.has_channel():
- cursor.execute('NOTIFY "%s"', (cls.__name__,))
+ database.notify(transaction.connection, cls.__name__, '')
if not config.getboolean('queue', 'worker', default=False):
transaction.tasks.append(record.id)
return record.id