details: https://code.tryton.org/tryton/commit/44edc21632c6
branch: default
user: Cédric Krier <[email protected]>
date: Sat Apr 04 23:03:18 2026 +0200
description:
Add REST API
diffstat:
trytond/CHANGELOG | 1 +
trytond/doc/ref/models.rst | 7 +
trytond/trytond/model/modelsql.py | 4 +-
trytond/trytond/model/modelstorage.py | 4 +
trytond/trytond/protocols/jsonrpc.py | 3 +-
trytond/trytond/protocols/rest.py | 298 +++++++++++++++++++++++++++++++++
trytond/trytond/protocols/wrappers.py | 45 ++++-
trytond/trytond/res/user.py | 4 +-
trytond/trytond/tests/test_rest.py | 300 ++++++++++++++++++++++++++++++++++
trytond/trytond/wsgi.py | 1 +
10 files changed, 661 insertions(+), 6 deletions(-)
diffs (774 lines):
diff -r bf5a02b5446c -r 44edc21632c6 trytond/CHANGELOG
--- a/trytond/CHANGELOG Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/CHANGELOG Sat Apr 04 23:03:18 2026 +0200
@@ -1,3 +1,4 @@
+* Add REST API
* Add rpc suffix to URL for RPC call
* Support mounting application under a prefix
* Send emails on chat messages
diff -r bf5a02b5446c -r 44edc21632c6 trytond/doc/ref/models.rst
--- a/trytond/doc/ref/models.rst Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/doc/ref/models.rst Sat Apr 04 23:03:18 2026 +0200
@@ -352,6 +352,13 @@
Class methods:
+.. method:: ModelStorage.__json__([usages])
+
+ Returns a :py:class:`list <list>` of the fields to construct the JSON for
+ the REST API.
+ ``usages`` is a :py:class:`set <set>` containing the usage strings.
+
+
.. classmethod:: ModelStorage.log(records, event[, target[, user[, \**extra]]])
Log event for records.
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/model/modelsql.py
--- a/trytond/trytond/model/modelsql.py Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/trytond/model/modelsql.py Sat Apr 04 23:03:18 2026 +0200
@@ -1165,7 +1165,7 @@
if 'write_date' not in fields_names:
extra_fields.add('write_date')
for field_name in fields_names:
- if field_name in {'_timestamp', '_write', '_delete'}:
+ if field_name in {'_timestamp', '_write', '_delete', '__name__'}:
continue
if '.' in field_name:
field_name, field_related = field_name.split('.', 1)
@@ -1237,6 +1237,8 @@
columns[f] = Extract(
'EPOCH', Coalesce(table.write_date, table.create_date)
).cast(sql_type).as_('_timestamp')
+ elif f == '__name__':
+ columns[f] = Literal(cls.__name__).as_('__name__')
if ('write_date' not in fields_names
and columns.keys() == {'write_date'}):
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/model/modelstorage.py
--- a/trytond/trytond/model/modelstorage.py Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/trytond/model/modelstorage.py Sat Apr 04 23:03:18 2026 +0200
@@ -1844,6 +1844,10 @@
super().__init__(id, **kwargs)
+ @classmethod
+ def __json__(cls, usages=None):
+ return ['__name__', 'id', 'rec_name']
+
@property
def _cache(self):
return self._transaction_cache[self.__name__]
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/protocols/jsonrpc.py
--- a/trytond/trytond/protocols/jsonrpc.py Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/trytond/protocols/jsonrpc.py Sat Apr 04 23:03:18 2026 +0200
@@ -9,12 +9,11 @@
from werkzeug.exceptions import (
BadRequest, Conflict, Forbidden, HTTPException, InternalServerError,
Locked, TooManyRequests)
-from werkzeug.wrappers import Response
from trytond.exceptions import (
ConcurrencyException, LoginException, MissingDependenciesException,
RateLimitException, TrytonException, UserWarning)
-from trytond.protocols.wrappers import GzipStream, Request
+from trytond.protocols.wrappers import GzipStream, Request, Response
from trytond.tools import cached_property
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/protocols/rest.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/trytond/trytond/protocols/rest.py Sat Apr 04 23:03:18 2026 +0200
@@ -0,0 +1,298 @@
+# This file is part of Tryton. The COPYRIGHT file at the top level of
+# this repository contains the full copyright notices and license terms.
+
+import mimetypes
+from base64 import urlsafe_b64decode
+from functools import partial, wraps
+
+from trytond.config import config
+from trytond.model.exceptions import AccessError
+from trytond.pool import Pool
+from trytond.tools import is_instance_method
+from trytond.transaction import Transaction
+from trytond.wsgi import app
+
+from .jsonrpc import JSONDecoder, JSONEncoder, json
+from .wrappers import (
+ HTTPStatus, Response, abort, user_application, with_pool, with_transaction)
+
+_rest = user_application('rest')
+_json_decoder = JSONDecoder()
+_request_timeout = config.getint('request', 'timeout', default=0)
+
+
+def rest(func):
+ @wraps(func)
+ def wrapper(request, pool, *args, **kwargs):
+ transaction = Transaction()
+ context_header = request.headers.get('X-Tryton-Context')
+ if context_header:
+ context = json.loads(
+ urlsafe_b64decode(context_header),
+ object_hook=_json_decoder)
+ else:
+ context = {}
+ languages = request.headers.get('Accept-Language')
+ if languages:
+ languages = languages.split(',')
+ pairs = []
+ for language in languages:
+ try:
+ language, q = language.split(';', 1)
+ except ValueError:
+ q = 1
+ else:
+ q = float(q.split('=')[1])
+ pairs.append((q, language.strip()))
+ language = sorted(pairs, reverse=True)[0][1]
+ context['language'] = language.replace('-', '_')
+ with transaction.set_context(context=context):
+ response = _rest(func)(request, pool, *args, **kwargs)
+ response.headers['Content-Language'] = (
+ transaction.language.replace('_', '-'))
+ return response
+ return wrapper
+
+
+def get_usages(request):
+ return set(map(
+ str.strip,
+ request.headers.get('X-Tryton-Usage', '').split(',')))
+
+
+def _get_fields(Model, request):
+ pool = Pool()
+ ModelAccess = pool.get('ir.model.access')
+ ModelFieldAccess = pool.get('ir.model.field.access')
+ model_check = partial(
+ ModelAccess.check, mode='read', raise_exception=False)
+ field_check = partial(
+ ModelFieldAccess.check, mode='read', raise_exception=False)
+
+ def has_access(field):
+ paths = field.split('.')
+ model = Model
+ while paths:
+ field = paths.pop(0)
+ if (not model_check(model.__name__)
+ or not field_check(model.__name__, [field])):
+ return False
+ if paths:
+ model = getattr(model, field).get_target()
+ return True
+
+ if 'f' in request.args:
+ fields = request.args.getlist('f')
+ for path in list(fields):
+ paths = path.split('.')[:-1]
+ while paths:
+ field = '.'.join(paths)
+ fields.append(f'{field}.id')
+ fields.append(f'{field}.__name__')
+ paths.pop()
+ else:
+ fields = Model.__json__(get_usages(request))
+ fields.extend({'id', '__name__'})
+ return list(filter(has_access, set(_flatten_fields(fields))))
+
+
+def _flatten_fields(fields):
+ for field in fields:
+ if isinstance(field, str):
+ yield field
+ else:
+ parent, nested = field
+ nested = ['id', '__name__'] + list(nested)
+ for field in _flatten_fields(nested):
+ yield f'{parent}.{field}'
+
+
+def _read(Model, request, id):
+ result = Model.read([id], _get_fields(Model, request))[0]
+ return _remove_dots(result)
+
+
+def _remove_dots(result):
+ if not isinstance(result, dict):
+ return [_remove_dots(v) for v in result]
+ for key in list(result.keys()):
+ if key.endswith('.'):
+ value = result.pop(key)
+ result[key[:-1]] = _remove_dots(value) if value else value
+ return result
+
+
[email protected]('/<database_name>/rest/model/<name>', methods={'GET'})
+@with_pool
+@with_transaction(timeout=_request_timeout)
+@rest
+def search(request, pool, name):
+ Model = pool.get(name)
+ if 'd' in request.args:
+ domain = json.loads(
+ urlsafe_b64decode(request.args['d']).decode(),
+ object_hook=_json_decoder)
+ else:
+ domain = []
+ offset, limit = 0, None
+ if range_ := request.headers.get('Range'):
+ if ',' in range_:
+ range_ = None
+ else:
+ unit, range_ = range_.split('=', 1)
+ start, end = range_.split('-', 1)
+ if start and not end:
+ offset = int(start)
+ elif start and end:
+ offset = int(start)
+ limit = int(end) - offset
+ else:
+ range_ = None
+ else:
+ if 's' in request.args:
+ limit = int(request.args['s'])
+ offset = int(request.args.get('p', 0))
+ if 'o' in request.args:
+ order = json.loads(urlsafe_b64decode(request.args['o']).decode())
+ else:
+ order = None
+ result = Model.search_read(
+ domain, limit=limit, offset=offset, order=order,
+ fields_names=_get_fields(Model, request))
+ result = _remove_dots(result)
+ if range_:
+ if not result and offset:
+ abort(HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
+ if limit is None:
+ limit = len(result)
+ if len(result) >= limit:
+ count_limit = 10 * (limit + offset)
+ count = Model.search(
+ domain, order=[], limit=count_limit, count=True)
+ if count >= count_limit:
+ count = '*'
+ else:
+ count = offset + len(result)
+ response = Response(
+ json.dumps(result, cls=JSONEncoder),
+ content_type='application/json')
+ response.headers['Accept-Ranges'] = 'records'
+ response.headers['Content-Range'] = (
+ f'records {offset}-{offset + len(result)}/{count}')
+ return response
+ else:
+ return result
+
+
[email protected]('/<database_name>/rest/model/<name>/<int:id>', methods={'GET'})
+@with_pool
+@with_transaction(timeout=_request_timeout)
+@rest
+def get(request, pool, name, id):
+ Model = pool.get(name)
+ try:
+ return _read(Model, request, id)
+ except AccessError:
+ abort(HTTPStatus.NOT_FOUND)
+
+
[email protected]('/<database_name>/rest/model/<name>', methods={'POST'})
+@with_pool
+@with_transaction()
+@rest
+def create(request, pool, name):
+ Model = pool.get(name)
+ data = request.parsed_data
+ record, = Model.create([data])
+ try:
+ return _read(Model, request, record.id)
+ except AccessError:
+ return Response(status=HTTPStatus.NO_CONTENT)
+
+
[email protected]('/<database_name>/rest/model/<name>/<int:id>', methods={'PUT'})
+@with_pool
+@with_transaction()
+@rest
+def update(request, pool, name, id):
+ Model = pool.get(name)
+ data = request.parsed_data
+ try:
+ record, = Model.search([('id', '=', id)])
+ except ValueError:
+ abort(HTTPStatus.NOT_FOUND)
+ Model.write([record], data)
+ try:
+ return _read(Model, request, record.id)
+ except AccessError:
+ return Response(status=HTTPStatus.NO_CONTENT)
+
+
[email protected]('/<database_name>/rest/model/<name>/<int:id>', methods={'DELETE'})
+@with_pool
+@with_transaction()
+@rest
+def delete(request, pool, name, id):
+ Model = pool.get(name)
+ try:
+ record, = Model.search([('id', '=', id)])
+ except ValueError:
+ abort(HTTPStatus.NOT_FOUND)
+ Model.delete([record])
+ return Response(status=HTTPStatus.NO_CONTENT)
+
+
[email protected](
+ '/<database_name>/rest/model/<name>/<int:id>/<action>', methods={'POST'})
[email protected](
+ '/<database_name>/rest/model/<name>/<action>', methods={'POST'})
+@with_pool
+def action(request, pool, name, action, id=None):
+ Model = pool.get(name)
+ data = request.parsed_data or {}
+ rpc = Model.__rpc__.get(action)
+ if not rpc:
+ abort(HTTPStatus.FORBIDDEN)
+
+ @with_transaction(readonly=rpc.readonly)
+ @rest
+ def _action(request, pool, name, action, id):
+ try:
+ if id is not None:
+ try:
+ record, = Model.search([('id', '=', id)])
+ except ValueError:
+ abort(HTTPStatus.NOT_FOUND)
+ if is_instance_method(Model, action):
+ result = getattr(Model, action)(record, **data)
+ else:
+ result = getattr(Model, action)([record], **data)
+ else:
+ result = getattr(Model, action)(**data)
+ except AccessError:
+ abort(HTTPStatus.FORBIDDEN)
+ result = rpc.result(result)
+ if id is not None and result is None:
+ return _read(Model, request, id)
+ elif result is not None:
+ return result
+ else:
+ return Response(status=HTTPStatus.NO_CONTENT)
+ return _action(request, pool, name, action, id)
+
+
[email protected]('/<database_name>/rest/report/<name>/<int:id>', methods={'GET'})
+@with_pool
+@with_transaction()
+@rest
+def report(request, pool, name, id):
+ Report = pool.get(name, type='report')
+ data = request.parsed_data or {}
+ ext, content, _, filename = Report.execute([id], data)
+ filename = f'{filename}.{ext}'
+ mimetype, _ = mimetypes.guess_type(filename)
+ return Response(content,
+ mimetype=mimetype,
+ headers={
+ 'Content-Disposition': f'attachment; filename="{filename}"',
+ })
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/protocols/wrappers.py
--- a/trytond/trytond/protocols/wrappers.py Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/trytond/protocols/wrappers.py Sat Apr 04 23:03:18 2026 +0200
@@ -18,7 +18,7 @@
from werkzeug.exceptions import abort
from werkzeug.utils import redirect, send_file
from werkzeug.wrappers import Request as _Request
-from werkzeug.wrappers import Response
+from werkzeug.wrappers import Response as _Response
from trytond import backend, config, security
from trytond.exceptions import RateLimitException, UserError, UserWarning
@@ -204,6 +204,44 @@
}
+class Response(_Response):
+
+ def get_json(self, force=False, silent=False):
+ from .jsonrpc import JSONDecoder, json
+
+ if not (force or self.is_json):
+ return None
+
+ data = self.get_data()
+
+ try:
+ return json.loads(data, object_hook=JSONDecoder())
+ except ValueError:
+ if not silent:
+ raise
+
+ return None
+
+
+class JSONBadRequest(exceptions.BadRequest):
+ def __init__(self, e):
+ super().__init__()
+ self.message = e.message
+ self.description = e.description
+
+ def get_body(self, environment, scope):
+ from .jsonrpc import JSONEncoder, json
+
+ return json.dumps({
+ 'status': self.code,
+ 'message': self.message,
+ 'description': self.description,
+ }, cls=JSONEncoder)
+
+ def get_headers(self, environment, scope):
+ return [('Content-Type', 'application/json')]
+
+
Session = collections.namedtuple('Session', 'type username userid token')
@@ -371,7 +409,10 @@
# TODO language
with transaction.set_user(application.user.id), \
check_access():
- response = func(request, *args, **kwargs)
+ try:
+ response = func(request, *args, **kwargs)
+ except (UserError, UserWarning) as e:
+ response = JSONBadRequest(e)
if not isinstance(response, Response) and json:
response = Response(json_.dumps(response, cls=JSONEncoder),
content_type='application/json')
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/res/user.py
--- a/trytond/trytond/res/user.py Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/trytond/res/user.py Sat Apr 04 23:03:18 2026 +0200
@@ -999,7 +999,9 @@
key = fields.Char("Key", required=True, strip=False)
user = fields.Many2One('res.user', "User")
- application = fields.Selection([], "Application", required=True)
+ application = fields.Selection([
+ ('rest', "REST API"),
+ ], "Application", required=True)
state = fields.Selection([
('requested', "Requested"),
('validated', "Validated"),
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/tests/test_rest.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/trytond/trytond/tests/test_rest.py Sat Apr 04 23:03:18 2026 +0200
@@ -0,0 +1,300 @@
+# This file is part of Tryton. The COPYRIGHT file at the top level of this
+# repository contains the full copyright notices and license terms.
+
+import json
+from base64 import urlsafe_b64encode
+from unittest.mock import patch
+
+try:
+ from http import HTTPStatus
+except ImportError:
+ from http import client as HTTPStatus
+
+from trytond.pool import Pool
+from trytond.tests.test_tryton import RouteTestCase
+
+
+class RESTTestCase(RouteTestCase):
+ key = None
+ module = 'res'
+ language = 'fr'
+
+ def setUp(self):
+ super().setUp()
+ patcher = patch('trytond.res.user._send_email')
+ patcher.start()
+ self.addCleanup(patcher.stop)
+
+ @classmethod
+ def setUpDatabase(cls):
+ pool = Pool()
+ User = pool.get('res.user')
+ UserApplication = pool.get('res.user.application')
+ admin, = User.search([('login', '=', 'admin')])
+ admin.email = '[email protected]'
+ admin.save()
+ application = UserApplication(user=admin, application='rest')
+ application.save()
+ cls.key = application.key
+
+ @property
+ def headers(self):
+ return {
+ 'Authorization': f'bearer {self.key}',
+ }
+
+ def test_search(self):
+ "Test search"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/res.user', headers=self.headers,
+ query_string=[
+ ('d', urlsafe_b64encode(json.dumps(
+ [('login', '=', 'admin')]).encode())),
+ ])
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(
+ response.json,
+ [{'id': 1, '__name__': 'res.user', 'rec_name': 'Administrator'}])
+ self.assertEqual(response.headers.get('Content-Language'), 'fr')
+
+ def test_search_range(self):
+ "Test search range"
+ c = self.client()
+
+ headers = self.headers.copy()
+
+ size = len(
+ c.get(f'{self.db_name}/rest/model/ir.lang', headers=headers).json)
+
+ for range_, content_range, length in [
+ ('records=2-', f'records 2-{size}/{size}', size - 2),
+ ('records=2-4', f'records 2-4/{size}', 2),
+ ('records=2-4, 6-7', None, size),
+ ('records=-2', None, size),
+ ]:
+ with self.subTest(range_=range_):
+ headers['Range'] = range_
+ response = c.get(
+ f'{self.db_name}/rest/model/ir.lang', headers=headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(
+ response.headers.get('Content-Range'), content_range)
+ self.assertEqual(len(response.json), length)
+
+ headers['Range'] = f'records={size + 1}-'
+ response = c.get(f'{self.db_name}/rest/model/ir.lang', headers=headers)
+ self.assertEqual(
+ response.status_code,
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
+
+ def test_search_limit(self):
+ "Test search limit"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/ir.lang', headers=self.headers,
+ query_string=[('s', 2)])
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(len(response.json), 2)
+
+ def test_search_offset(self):
+ "Test search offset"
+ c = self.client()
+
+ size = len(
+ c.get(
+ f'{self.db_name}/rest/model/ir.lang',
+ headers=self.headers).json)
+
+ response = c.get(
+ f'{self.db_name}/rest/model/ir.lang', headers=self.headers,
+ query_string=[('p', 2)])
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(len(response.json), size - 2)
+
+ def test_search_order(self):
+ "Test search order"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/ir.lang', headers=self.headers,
+ query_string=[
+ ('o', urlsafe_b64encode(json.dumps(
+ [('id', 'ASC')]).encode()))])
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ result = response.json
+ self.assertEqual(result, sorted(result, key=lambda x: x['id']))
+
+ def test_get(self):
+ "Test get"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/res.user/1', headers=self.headers)
+
+ self.assertEqual(
+ response.json,
+ {'id': 1, '__name__': 'res.user', 'rec_name': 'Administrator'})
+
+ def test_get_fields(self):
+ "Test get fields"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/res.user/1', headers=self.headers,
+ query_string=[
+ ('f', 'name'),
+ ('f', 'login'),
+ ('f', 'groups.name'),
+ ])
+
+ self.assertEqual(
+ response.json,
+ {'id': 1, '__name__': 'res.user',
+ 'name': "Administrator", 'login': 'admin',
+ 'groups': [
+ {'id': 1, '__name__': 'res.group',
+ 'name': 'Administration'}],
+ })
+
+ def test_get_fields_2many(self):
+ "Test get only xxx2many fields"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/res.user/1', headers=self.headers,
+ query_string=[
+ ('f', 'groups'),
+ ])
+
+ self.assertEqual(
+ response.json,
+ {'id': 1, '__name__': 'res.user',
+ 'groups': [1],
+ })
+
+ def test_get_not_found(self):
+ "Test get not found"
+ response = self.client().get(
+ f'{self.db_name}/rest/model/res.user/42', headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.NOT_FOUND)
+
+ def test_create(self):
+ "Test create"
+ response = self.client().post(
+ f'{self.db_name}/rest/model/res.user', headers=self.headers,
+ json={'login': "test create"})
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(
+ response.json,
+ {**response.json,
+ '__name__': 'res.user',
+ 'rec_name': "test create",
+ })
+ id = response.json['id']
+ self.assertGreaterEqual(id, 0)
+
+ def test_update(self):
+ "Test update"
+ c = self.client()
+
+ response = c.post(
+ f'{self.db_name}/rest/model/res.user', headers=self.headers,
+ json={'login': "test update"})
+ id = response.json['id']
+
+ response = c.put(
+ f'{self.db_name}/rest/model/res.user/{id}', headers=self.headers,
+ json={'login': "test updated"})
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(
+ response.json,
+ {'id': id, '__name__': 'res.user', 'rec_name': 'test updated'})
+
+ def test_update_not_found(self):
+ "Test update not found"
+ response = self.client().put(
+ f'{self.db_name}/rest/model/res.user/42', headers=self.headers,
+ json={'name': "Administrator"})
+
+ self.assertEqual(response.status_code, HTTPStatus.NOT_FOUND)
+
+ def test_delete(self):
+ "Test delete"
+ c = self.client()
+
+ response = c.post(
+ f'{self.db_name}/rest/model/res.group', headers=self.headers,
+ json={'name': 'test delete'})
+ id = response.json['id']
+
+ response = c.delete(
+ f'{self.db_name}/rest/model/res.group/{id}', headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.NO_CONTENT)
+
+ def test_delete_not_found(self):
+ "Test delete not found"
+ response = self.client().delete(
+ f'{self.db_name}/rest/model/res.group/42', headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.NOT_FOUND)
+
+ def test_button(self):
+ "Test button"
+ response = self.client().post(
+ f'{self.db_name}/rest/model/res.user/1/reset_password',
+ headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(
+ response.json,
+ {'id': 1, '__name__': 'res.user', 'rec_name': 'Administrator'})
+
+ def test_button_no_record(self):
+ "Test button without record"
+ response = self.client().post(
+ f'{self.db_name}/rest/model/res.user/get_preferences',
+ headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertIsInstance(response.json, dict)
+
+ def test_button_data(self):
+ "Test button with data"
+ response = self.client().post(
+ f'{self.db_name}/rest/model/res.user/1/reset_password',
+ json={
+ 'length': 12,
+ },
+ headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+
+ def test_button_not_found(self):
+ "Test button not found"
+ response = self.client().post(
+ f'{self.db_name}/rest/model/res.user/42/reset_password',
+ headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.NOT_FOUND)
+
+ def test_button_no_button(self):
+ "Test no button"
+ response = self.client().post(
+ f'{self.db_name}/rest/model/res.user/1/validate',
+ headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.FORBIDDEN)
+
+ def test_report(self):
+ "Test report"
+ c = self.client()
+
+ response = c.post(
+ f'{self.db_name}/rest/model/res.user/1/reset_password',
+ headers=self.headers)
+ response = c.get(
+ f'{self.db_name}/rest/report/res.user.email_reset_password/1',
+ headers=self.headers)
+
+ self.assertEqual(response.status_code, HTTPStatus.OK)
+ self.assertEqual(response.mimetype, 'text/html')
diff -r bf5a02b5446c -r 44edc21632c6 trytond/trytond/wsgi.py
--- a/trytond/trytond/wsgi.py Sun Nov 12 23:43:19 2023 +0100
+++ b/trytond/trytond/wsgi.py Sat Apr 04 23:03:18 2026 +0200
@@ -277,3 +277,4 @@
import trytond.bus # noqa: E402,F401
import trytond.protocols.dispatcher # noqa: E402,F401
+import trytond.protocols.rest # noqa: E402,F401