#36219: Django Celery: Kombu connection refused error. OperationalError: [Errno
111] Connection refused
------------------------------+-------------------------------------------
     Reporter:  Anshul        |                     Type:  Bug
       Status:  new           |                Component:  Error reporting
      Version:  4.2           |                 Severity:  Normal
     Keywords:  celery kombu  |             Triage Stage:  Unreviewed
    Has patch:  0             |      Needs documentation:  0
  Needs tests:  0             |  Patch needs improvement:  0
Easy pickings:  0             |                    UI/UX:  0
------------------------------+-------------------------------------------
 It seems a bit odd, maybe I'm missing something, but whenever I send tasks
 to celery queue it suddenly gives error:


 {{{
 AttributeError: 'ChannelPromise' object has no attribute 'value'

 }}}

 The issue is reproducible when I call the API request twice, which
 triggers the async task. The error appears starting from the second
 request. However, if I cancel the request in between and send another one
 (e.g., using a tool like Postman), the issue is resolved.

 Celery Settings:


 {{{
 CELERY_RESULT_BACKEND           = 'django-db'
 CELERY_BROKER_URL               = 'sqs://'
 CELERY_BROKER_TRANSPORT_OPTIONS = {
     'region' : 'us-south-1', #temp name
     'visibility-timeout' : 3600,
     'polling-interval' : 10
 }
 CELERY_ACCEPT_CONTENT           = ['application/json']
 CELERY_TASK_SERIALIZER          = 'json'
 CELERY_RESULT_SERIALIZER        = 'json'
 CELERY_TIMEZONE                 = 'Asia/Kolkata'

 }}}

 **Packages:
 **

 {{{
 celery = ">=5.0.5"
 django = "==4.2.16"
 kombu = "==5.4.2"
 django-celery-beat = ">=2.0.0"
 django-celery-results = ">=2.2.0"

 }}}

 djangoproject/djangoproject/__init__.py

 {{{
 from .celery import app as celery_app

 __all__ = ['celery_app']
 }}}

 djangoproject/djangoproject/celery.py


 {{{
 import os

 from celery import Celery

 from project.settings import settings

 # set the default Django settings module for the 'celery' program.
 #settings are kept inside a separate folder for multiple envs
 
[project/settings/settings_prod.py,project/settings/settings_stag.py,project/settings/settings.py]
 os.environ.setdefault('DJANGO_SETTINGS_MODULE',
 'project.settings.settings')

 app = Celery('project')

 app.config_from_object('django.conf:settings', namespace='CELERY')

 # Load task modules from all registered Django app configs.
 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


 @app.task(bind=True)
 def debug_task(self):
     pass
 }}}

 **Complete error trace:**


 {{{
 kombu.exceptions.OperationalError: [Errno 111] Connection refused on next
 try in production. complete error trace: Traceback (most recent call
 last):
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/django/core/handlers/exception.py", line 55, in inner
     response = get_response(request)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/django/core/handlers/base.py", line 197, in _get_response
     response = wrapped_callback(request, *callback_args,
 **callback_kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/sentry_sdk/integrations/django/views.py", line 94, in
 sentry_wrapped_callback
     return callback(request, *args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/django/views/decorators/csrf.py", line 56, in wrapper_view
     return view_func(*args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/django/views/generic/base.py", line 104, in view
     return self.dispatch(request, *args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/rest_framework/views.py", line 509, in dispatch
     response = self.handle_exception(exc)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/rest_framework/views.py", line 469, in handle_exception
     self.raise_uncaught_exception(exc)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/rest_framework/views.py", line 480, in raise_uncaught_exception
     raise exc
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/rest_framework/views.py", line 506, in dispatch
     response = handler(request, *args, **kwargs)
   File "/var/app/current/appname/views/views_user.py", line 230, in patch
     my_task.delay()
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/celery/app/task.py", line 444, in delay
     return self.apply_async(args, kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/sentry_sdk/integrations/celery/__init__.py", line 290, in
 apply_async
     return f(*args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/celery/app/task.py", line 594, in apply_async
     return app.send_task(
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/sentry_sdk/integrations/celery/__init__.py", line 290, in
 apply_async
     return f(*args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/celery/app/base.py", line 801, in send_task
     amqp.send_task_message(P, name, message, **options)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/celery/app/amqp.py", line 518, in send_task_message
     ret = producer.publish(
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/sentry_sdk/utils.py", line 1788, in runner
     return sentry_patched_function(*args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/sentry_sdk/integrations/celery/__init__.py", line 527, in
 sentry_publish
     return original_publish(self, *args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/messaging.py", line 186, in publish
     return _publish(
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/connection.py", line 556, in _ensured
     return fun(*args, **kwargs)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/messaging.py", line 195, in _publish
     channel = self.channel
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/messaging.py", line 218, in _get_channel
     channel = self._channel = channel()
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/utils/functional.py", line 34, in __call__
     value = self.__value__ = self.__contract__()
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/messaging.py", line 234, in <lambda>
     channel = ChannelPromise(lambda: connection.default_channel)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/connection.py", line 953, in default_channel
     self._ensure_connection(**conn_opts)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/connection.py", line 459, in _ensure_connection
     return retry_over_time(
   File "/usr/lib64/python3.8/contextlib.py", line 131, in __exit__
     self.gen.throw(type, value, traceback)
   File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
 packages/kombu/connection.py", line 476, in _reraise_as_library_errors
     raise ConnectionError(str(exc)) from exc
 kombu.exceptions.OperationalError: [Errno 111] Connection refused

 }}}


 **celery-worker output**


 {{{
 User information: uid=0 euid=0 gid=0 egid=0

   warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

  -------------- [email protected] v5.4.0
 (opalescent)
 --- ***** -----
 -- ******* ---- Linux-6.1.128-136.201.amzn2023.x86_64-x86_64-with-
 glibc2.34 2025-02-28 19:19:41
 - *** --- * ---
 - ** ---------- [config]
 - ** ---------- .> app:         djangoproject:0x4f535e7c8v70
 - ** ---------- .> transport:   sqs://localhost//
 - ** ---------- .> results:
 - *** --- * --- .> concurrency: 2 (solo)
 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
 worker)
 --- ***** -----
  -------------- [queues]
                 .> celery           exchange=celery(direct) key=celery


 [tasks]
   . djangoproject.celery.debug_task
   . djangoproject.myapp.tasks.activity

 [2025-02-28 19:19:41,982: WARNING/MainProcess] /var/app/venv/staging-
 LQM1lest/lib64/python3.8/site-
 packages/celery/worker/consumer/consumer.py:508:
 CPendingDeprecationWarning: The broker_connection_retry configuration
 setting will no longer determine
 whether broker connection retries are made during startup in Celery 6.0
 and above.
 If you wish to retain the existing behavior for retrying connections on
 startup,
 you should set broker_connection_retry_on_startup to True.
   warnings.warn(

 [2025-02-28 19:19:41,995: INFO/MainProcess] Found credentials in
 environment variables.
 [2025-02-28 19:19:42,109: INFO/MainProcess] Connected to sqs://localhost//

 }}}

 Steps tried till now:
 1. starting worker on a separate server, right now it is running on the
 same with supervisor
 2. simplifying the **settings.py** to one file(removed prod, stag,etc
 complexity)
 3. added broker url and some other params to app = Celery() in
 **celery.py**
 4. tried upgrading python 3.8 to 3.9
 5. replacing  **task.delay** with  **task.apply_async**


 I'm thinking that it might be due to version incompatibilty but not sure
 about that
-- 
Ticket URL: <https://code.djangoproject.com/ticket/36219>
Django <https://code.djangoproject.com/>
The Web framework for perfectionists with deadlines.

-- 
You received this message because you are subscribed to the Google Groups 
"Django updates" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion visit 
https://groups.google.com/d/msgid/django-updates/010701954e3e46b4-21f14706-705c-40f3-862f-8439f96f52de-000000%40eu-central-1.amazonses.com.

Reply via email to