Сергей Яковлев
Мой блокнот, мастерская, место где я делюсь своим опытом и мыслями

Запуск Celery-задачи после выполнения всех задач

В этой заметке я попробую разобраться в нехитрой задаче — запуск Celery-задачи X после выполнения всех задач Y. Некая задача X должна получить в качестве входного параметра результат отработки всех задач Y. Это вроде бы решается в лоб, ручным сохранением промежуточного результата отработки каждой задачи Y, но интересно все же добиться того же самого средствами Celery, без ручных манипуляций с базой данных и написания лишнего кода.

Данная заметка ни в коем случае не претендует на развернутое или исчерпывающее руководство и должна восприниматься скорее как шпаргалка. По тексту всей заметки я буду расставлять ссылки на релевантную документацию. Рекомендую обращаться к официальной документации в первую очередь и конечно же все пробовать самим. Если вы заметили неточность или хотели бы, чтобы я добавил что-то интересное, напишите мне. Для простоты разворачивания тестового стенда я буду использовать докерезированый Redis в качестве брокера. В вашем конкретном случае брокером может выступать что-то другое, для демонстрационных целей задачи этот момент не принципиальный.

Итак, начнем с постановки задачи. Я выдумал несложную и понятную задачу, на примере которой хочу показать, как в Celery принято реализовывать патерн, в котором выполняется задача с результатами выполнения предыдущих.

Предположим у нас есть 2 задачи. Первая считает степень числа, а вторая — выводит сумму всех результатов отработок задач первого типа. В реальной жизни этот патерн применим допустим такой ситуации, когда вы в одной задаче генерируете PDF платежку по абоненту, а в задаче второго типа склеиваете все инвойсы в один файл. Разумеется мы можем складывать промежуточный результат допустим в базу и конечно мы можем что-то придумать с тем, чтобы запустить задачу второго типа после отработки последней задачи первого. Однако как только начинаешь думать об реализации такого поведения на ум приходят одни костыли не гибкие решения. Но у Celery оказывается есть отличное решение, описанное в Canvas: Designing Work-flows! Не буду вдаваться в рассуждения, а сразу покажу код.

В первую очередь давайте создадим папку workflow, а в ней пустой __init__.py файл. Затем в папке workflow создадим Celery-приложение назвав модуль celery.py. Я сознательно упущу все малозначительные для целей этой заметки детали. Production код все равно будет сложнее.

from celery import Celery

app = Celery(
    'workflow',
    broker='redis://localhost',
    include=['workflow.tasks'])

if __name__ == '__main__':
    app.start()

Второй важный модуль который нам понадобится это tasks (задачи). Создадим в папке workflow файл tasks.py с следующим содержимым:

from time import sleep

import random

from .celery import app

@app.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 100.0)
    return value ** expo


@app.task
def amass(values):
    print(sum(values))

Итак, пока что код достаточно простой, чтобы его комментировать, кроме пожалуй одного момента — я добавил в функцию power произвольную задержу, чтобы симулировать сложную задачу. В противном случае все очень быстро отрабатывает и сложно заметить асинхронную природу задачи.

А теперь самое интересное. Во первых мы хотим асинхронно выполнить power (допустим десять раз). А во вторых, результат всех десяти отработок передать в amass. В Celery это делается это при помощи интересного примитива chord, описанного в разделе The Primitives. chord это не функция, а класс. А если быть еще более точным алиас на класс celery.canvas._chord. Итак, создадим в папке workflow файл producer.py с следующим содержимым:

from celery import chord

from .tasks import amass, power


def main():
    tasks = (power.s(i, 2) for i in range(10))
    callback = amass.s()

    return chord(tasks)(callback)


if __name__ == '__main__':
    print('Start testing workflow...')
    main()

На этом собственно и все. Наш тестовый проект готов. По финалу должен был получиться проект со следующей структурой:

$ tree -a
.
└── workflow
    ├── __init__.py
    ├── celery.py
    ├── producer.py
    └── tasks.py

1 directory, 4 files

Запустим Redis и попробуем разобраться в том, как это работает. Для целей примера я буду использовать докерезированный Redis с настройками по умолчанию:

$ docker run -d -p 6379:6379 redis

Запустим celery в режиме воркера, который будет слушать нашу очередь и убедимся по его выводу в консоль, что все хорошо, он готов обслуживать задачи:

$ celery -A workflow worker --loglevel=INFO

Ну и финальный аккорд, запустим продюсер, накинуть так сказать на вентилятор задач:

$ python -m workflow.producer

Я не знаю, в каком году вы это будете читать, но у нас тут в 2021 Celery при такой конфигурации, которую я описал выше, выбросит NotImplementedError, с следующим сообщением:

NotImplementedError: Starting chords requires a result backend to be configured.

Что в принципе логично. Никакой магии нет, Celery где-то нужно сохранить результат отработки задач. Единственное, что не кажется мне понятным здесь, так это NotImplementedError, будто бы это может реализовано позже. В общем открываем файл workflow/celery.py и изменяем его следующим образом:

from celery import Celery

app = Celery(
    'workflow',
    broker='redis://localhost',
    backend='redis://localhost',
    include=['workflow.tasks'])

if __name__ == '__main__':
    app.start()

Тут, опять же для простоты примера, в качестве результирующего бекенда я выбрал тот же Redis. Придется перезапустить воркер, а также повторить запуск продюсера после добавления бекенда. В конце концов вы должны увидеть, как продюсер быстро накидал задачек и завершил работу. И если у вас было открыто отдельное окно/вкладка с воркером, то вы могли обратить внимание на то, что воркер, после того как продюсер завершил свою работу, еще какое-то время отрабатывал из-за строки с sleep() в теле задачки power(). Кроме того, в выводе воркера среди прочего будет что-то похожее на следующее:

[2021-09-15 01:16:11,208: WARNING/ForkPoolWorker-8] 285
[2021-09-15 01:16:11,209: WARNING/ForkPoolWorker-8]

что доказывает отработку amass() в самом конце с результатами 10 вызовов power().

Теперь давайте взглянем чуть подробнее на то, что мы использовали, чтобы добиться результата. Самым интересным из того, что мы написали, является использование примитива chord. Как гласит документация, примитив chord предназначен для тех ситуаций, когда нам необходимо выполнить параллельно список задач, а по завершению всех этих задач необходимо выполнить колбек. Вот так все просто, мы берем chord(...)(...) у которого заголовком (первая группа скобок) идет список задач, а телом — колбек. Колбеком в нашем случае выступает callback:

callback = amass.s()

Почему вообще колбеки это хорошо? Ожидание одной задачей результата выполнения другой на самом деле неэффективно и может даже приводить к локам, если пул воркеров исчерпан. Вместо этого практически всегда рекомендуется делать акцент на асинхронный дизайн очередей, например, используя колбеки. Я хотел бы еще прояснить конструкцию chord(...)(...), чтобы не вносить путаницу. Так как chord это класс (алиас на класс), то фактически эта конструкция эквивалентна следующей:

group = chord(tasks)
group(callback)

Вторая строка, как наверное можно догадаться законна из-за перегрузки __call__(). Стоит также отметить, что на протяжении всей заметки я использовал формат task.s(...), что не является вызовом задачки, а лишь создает ее сигнатуру, для использования в будущем. Говоря простым языком — передавая в chord список из сигнатур, мы фактически помещали в результирующий бекенд сериализованые задачки. Вот почему мы ловили NotImplementedError в самом начале и вот почему нам пришлось указать Celery бекенд backend='redis://localhost'.

На что стоит обратить внимание и проработать отдельно? Если хоть одна из задач из списка провалится и по факту не выполнится, то колбек зарегистрированный при помощи примитива chord не будет вызван ни разу. Об этом следует позаботится отдельно.

Все что я здесь написал оформлено в виде рабочего проекта, который можно получить по следующей ссылке: https://github.com/sergeyklay/celerypg

Вернуться в начало