我读了this但无法理解。我的技能有限。我尝试了stackoverflow的几个答案,但它们不适用于我正在使用的Celery4
我有tasks.py
和pn_list
在里面
@shared_task
def pn_list(user_ids: typing.List[int], title, msg, extra: typing.Dict = None):
...
当我在serializer
中调用这个函数时,我必须使用pn_list.delay(...)
调用它。这就是问题所在
尝试:
我已经设置了我的CELERY_TASK_ALWAYS_EAGER=True
和CELERY_TASK_EAGER_PROPAGATES=True
{
错误:
File "/Users/sarit/mein-codes/multy_herr/multy_herr/tweets/api/serializers.py", line 208, in create
pn_list.delay(tmp_ids, title, msg, notification_msg)
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/task.py", line 427, in delay
return self.apply_async(args, kwargs)
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/task.py", line 552, in apply_async
link=link, link_error=link_error, **options)
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/task.py", line 772, in apply
propagate=throw, app=self._get_app(),
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/trace.py", line 295, in build_tracer
backend = task.backend
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/task.py", line 1030, in backend
return self.app.backend
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/base.py", line 1207, in backend
return self._get_backend()
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/base.py", line 925, in _get_backend
self.loader)
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/backends.py", line 74, in by_url
return by_name(backend, loader), url
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/celery/app/backends.py", line 54, in by_name
cls = symbol_by_name(backend, aliases)
File "/Users/sarit/.pyenv/versions/multy_herr/lib/python3.7/site-packages/kombu/utils/imports.py", line 57, in symbol_by_name
module = imp(module_name, package=package, **kwargs)
File "/Users/sarit/.pyenv/versions/3.7.3/lib/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named "''"
我的解决方法是if else
调用不同的函数。
条件是CELERY_TASK_ALWAYS_EAGER == True
如果不调用pn_list.delay(...)
,将调用pn_list
如果Django和Celery4有共同的变量来解决这个问题,就像在Celery3中一样,那就太好了
问题:
在viewsets
正在使用的serializer
中测试pn_list
是否有简单的技术
目前没有回答
相关问题 更多 >
编程相关推荐