Запуск Celery-задачи после выполнения всех задач
В этой заметке я попробую разобраться в нехитрой задаче — запуск Celery-задачи X после выполнения всех задач Y. Некая задача X должна получить в качестве входного параметра результат отработки всех задач Y. Это вроде бы решается в лоб, ручным сохранением промежуточного результата отработки каждой задачи Y, но интересно все же добиться того же самого средствами Celery, без ручных манипуляций с базой данных и написания лишнего кода.
Данная заметка ни в коем случае не претендует на развернутое или исчерпывающее руководство и должна восприниматься скорее как шпаргалка. По тексту всей заметки я буду расставлять ссылки на релевантную документацию. Рекомендую обращаться к официальной документации в первую очередь и конечно же все пробовать самим. Если вы заметили неточность или хотели бы, чтобы я добавил что-то интересное, напишите мне. Для простоты разворачивания тестового стенда я буду использовать докерезированый Redis в качестве брокера. В вашем конкретном случае брокером может выступать что-то другое, для демонстрационных целей задачи этот момент не принципиальный.
Итак, начнем с постановки задачи. Я выдумал несложную и понятную задачу, на примере которой хочу показать, как в Celery принято реализовывать патерн, в котором выполняется задача с результатами выполнения предыдущих.
Предположим у нас есть 2 задачи. Первая считает степень числа, а вторая — выводит сумму всех результатов отработок задач первого типа. В реальной жизни этот патерн применим допустим такой ситуации, когда вы в одной задаче генерируете PDF платежку по абоненту, а в задаче второго типа склеиваете все инвойсы в один файл. Разумеется мы можем складывать промежуточный результат допустим в базу и конечно мы можем что-то придумать с тем, чтобы запустить задачу второго типа после отработки последней задачи первого. Однако как только начинаешь думать об реализации такого поведения на ум приходят одни костыли не гибкие решения. Но у Celery оказывается есть отличное решение, описанное в Canvas: Designing Work-flows! Не буду вдаваться в рассуждения, а сразу покажу код.
В первую очередь давайте создадим папку workflow
, а в ней пустой __init__.py
файл. Затем в папке workflow
создадим Celery-приложение назвав модуль celery.py
. Я сознательно упущу все малозначительные для целей этой заметки детали. Production код все равно будет сложнее.
from celery import Celery
app = Celery(
'workflow',
broker='redis://localhost',
include=['workflow.tasks'])
if __name__ == '__main__':
app.start()
Второй важный модуль который нам понадобится это tasks
(задачи). Создадим в папке workflow
файл tasks.py
с следующим содержимым:
from time import sleep
import random
from .celery import app
@app.task
def power(value, expo):
sleep(random.randint(10, 1000) / 100.0)
return value ** expo
@app.task
def amass(values):
print(sum(values))
Итак, пока что код достаточно простой, чтобы его комментировать, кроме пожалуй одного момента — я добавил в функцию power
произвольную задержу, чтобы симулировать сложную задачу. В противном случае все очень быстро отрабатывает и сложно заметить асинхронную природу задачи.
А теперь самое интересное. Во первых мы хотим асинхронно выполнить power
(допустим десять раз). А во вторых, результат всех десяти отработок передать в amass
. В Celery это делается это при помощи интересного примитива chord
, описанного в разделе The Primitives. chord
это не функция, а класс. А если быть еще более точным алиас на класс celery.canvas._chord
. Итак, создадим в папке workflow
файл producer.py
с следующим содержимым:
from celery import chord
from .tasks import amass, power
def main():
tasks = (power.s(i, 2) for i in range(10))
callback = amass.s()
return chord(tasks)(callback)
if __name__ == '__main__':
print('Start testing workflow...')
main()
На этом собственно и все. Наш тестовый проект готов. По финалу должен был получиться проект со следующей структурой:
$ tree -a
.
└── workflow
├── __init__.py
├── celery.py
├── producer.py
└── tasks.py
1 directory, 4 files
Запустим Redis и попробуем разобраться в том, как это работает. Для целей примера я буду использовать докерезированный Redis с настройками по умолчанию:
$ docker run -d -p 6379:6379 redis
Запустим celery в режиме воркера, который будет слушать нашу очередь и убедимся по его выводу в консоль, что все хорошо, он готов обслуживать задачи:
$ celery -A workflow worker --loglevel=INFO
Ну и финальный аккорд, запустим продюсер, накинуть так сказать на вентилятор задач:
$ python -m workflow.producer
Я не знаю, в каком году вы это будете читать, но у нас тут в 2021 Celery при такой конфигурации, которую я описал выше, выбросит NotImplementedError, с следующим сообщением:
NotImplementedError: Starting chords requires a result backend to be configured.
Что в принципе логично. Никакой магии нет, Celery где-то нужно сохранить результат отработки задач. Единственное, что не кажется мне понятным здесь, так это NotImplementedError, будто бы это может реализовано позже. В общем открываем файл workflow/celery.py
и изменяем его следующим образом:
from celery import Celery
app = Celery(
'workflow',
broker='redis://localhost',
backend='redis://localhost',
include=['workflow.tasks'])
if __name__ == '__main__':
app.start()
Тут, опять же для простоты примера, в качестве результирующего бекенда я выбрал тот же Redis. Придется перезапустить воркер, а также повторить запуск продюсера после добавления бекенда. В конце концов вы должны увидеть, как продюсер быстро накидал задачек и завершил работу. И если у вас было открыто отдельное окно/вкладка с воркером, то вы могли обратить внимание на то, что воркер, после того как продюсер завершил свою работу, еще какое-то время отрабатывал из-за строки с sleep()
в теле задачки power()
. Кроме того, в выводе воркера среди прочего будет что-то похожее на следующее:
[2021-09-15 01:16:11,208: WARNING/ForkPoolWorker-8] 285
[2021-09-15 01:16:11,209: WARNING/ForkPoolWorker-8]
что доказывает отработку amass()
в самом конце с результатами 10 вызовов power()
.
Теперь давайте взглянем чуть подробнее на то, что мы использовали, чтобы добиться результата. Самым интересным из того, что мы написали, является использование примитива chord
. Как гласит документация, примитив chord
предназначен для тех ситуаций, когда нам необходимо выполнить параллельно список задач, а по завершению всех этих задач необходимо выполнить колбек. Вот так все просто, мы берем chord(...)(...)
у которого заголовком (первая группа скобок) идет список задач, а телом — колбек. Колбеком в нашем случае выступает callback
:
callback = amass.s()
Почему вообще колбеки это хорошо? Ожидание одной задачей результата выполнения другой на самом деле неэффективно и может даже приводить к локам, если пул воркеров исчерпан. Вместо этого практически всегда рекомендуется делать акцент на асинхронный дизайн очередей, например, используя колбеки. Я хотел бы еще прояснить конструкцию chord(...)(...)
, чтобы не вносить путаницу. Так как chord
это класс (алиас на класс), то фактически эта конструкция эквивалентна следующей:
group = chord(tasks)
group(callback)
Вторая строка, как наверное можно догадаться законна из-за перегрузки __call__()
. Стоит также отметить, что на протяжении всей заметки я использовал формат task.s(...)
, что не является вызовом задачки, а лишь создает ее сигнатуру, для использования в будущем. Говоря простым языком — передавая в chord
список из сигнатур, мы фактически помещали в результирующий бекенд сериализованые задачки. Вот почему мы ловили NotImplementedError в самом начале и вот почему нам пришлось указать Celery бекенд backend='redis://localhost'
.
На что стоит обратить внимание и проработать отдельно? Если хоть одна из задач из списка провалится и по факту не выполнится, то колбек зарегистрированный при помощи примитива chord
не будет вызван ни разу. Об этом следует позаботится отдельно.
Все что я здесь написал оформлено в виде рабочего проекта, который можно получить по следующей ссылке: https://github.com/sergeyklay/celerypg